标签:pre highlight break 方便 介绍 时间 learn style csdn
解决问题:采用并行运算提升数据处理速度
首先介绍一下并行计算是同时有很多程序一起运行,线程是进程的基本单位,一个进程中包括很多个线程,线程共享同一个进程的资源和空间,而进程之间是独立的,互不干扰,不能共享资源和空间
快速掌握Python写并行程序:https://blog.csdn.net/NNNJ9355/article/details/103774314
其次并行计算要知道自己电脑中CPU核数:https://jingyan.baidu.com/article/4f34706e1c6757e386b56d57.html,方便下面写程序是填写最大使用核数
数据处理描述:一共有4096个.csv文件,每个文件中都有数据,计算两两之间的相关系数
原本计算一个需要8个小时,采用了6核计算后,处理时间变为1个半小时,速度提升很快
数据形式:
python提供了multiprocessing并行运算的库
第一个:自发电学习神经元之间的互相关程序
import pandas as pd import numpy as np from sympy import DiracDelta#导入狄拉克函 from matplotlib import pyplot as plt import os import math import multiprocessing # 导入多进程中的进程池 from functools import partial from time import * def get_sum(num1,num2,array): count=0 for n in array: if num1<=n<=num2: count=count+1 if n>num2: break return count #遍历该文件夹下所有.csv文件 def get_file(path): # 创建一个空列表 files = os.listdir(path) list = [] for file in files: if not os.path.isdir(path + file): # 判断该文件是否是一个文件夹 f_name = str(file) # print(f_name) tr = ‘\\‘ # 多增加一个斜杠 filename = path + tr + f_name #filename = f_name list.append(filename)#得到所有 return list def get_csvarr(csvPath): allFileNames = get_file(csvPath) # 获取4096个.csv larr = np.array(allFileNames) csvPathArr = np.reshape(larr, (larr.shape[0])) # 获取.csv路径数组 return csvPathArr def get_learnArr(lpPath): pdata = pd.read_csv(lpPath, header=None) plarr = np.array(pdata[1:]) # 获取学习神经元的标签 plarr = np.reshape(plarr, (plarr.shape[0])) return plarr def read_csv(csvfilePath): data=pd.read_csv(csvfilePath,header=None) data=np.array(data) data=np.round(data,3) rd, cd = data.shape csvdata = np.reshape(data, cd) return csvdata def Cal_first2067CrossMartix(csvPathArr,lparr,threeit):#refName代表参考电极的路径,csvPathArr代表电极csv路径,num代表电极序号,lpLable代表学习标签,lparr代表学习位置数组 refPath=threeit[0]#原始数据路径 lpLable=int(float(threeit[1]))#原始数据的学习标签 num=int(threeit[2])#原始数据的序号 fillzeros=np.zeros(4096) if lpLable==1:#是学习的计算该电极与对角线之前的所有电极的相关系数 ref=read_csv(refPath) maxvalueTrain = np.zeros(1) t=0 for j in lparr[:num+1]: if j==1 and t!=num:#对角线之前所有为1的目标电极 taget=read_csv(csvPathArr[t]) #参考电极与目标电极做互相关 # 进行循环目标电极的每个点 refspiketrain = np.zeros(50) for r in ref: rbegin = r - 0.05 # spike左边点 refspikeevery = [] for b in range(50): bleft = rbegin + b * 0.002 # 每个小窗的时间 bright = rbegin + (b + 1) * 0.002 # bl=list(filter(lambda x: bleft <= x <= bright, taget)) bl = get_sum(bleft, bright, taget) # 每个窗内有几个符合的 # bl=np.array(bl) # blength=len(bl) if bl == 0: refspikeevery.append(0) # 添加窗内的数字没有符合的 else: refspikeevery.append(bl) # 有符合的添加符合的长度 refspiketrain = np.row_stack((refspiketrain, refspikeevery)) refspiketrain = np.delete(refspiketrain, 0, axis=0) # 删除第一行 a = [] for b in range(50): p = refspiketrain[:, b] # 取出每一列进行相加 a.append(sum(p)) a = np.array(a) c = math.sqrt(len(ref) * len(taget)) a = a / c maxvalue = max(a) maxvalueTrain = np.column_stack((maxvalueTrain, maxvalue)) else: maxvalueTrain = np.column_stack((maxvalueTrain, 0)) t=t+1 row,col=maxvalueTrain.shape lenzero=4096-col fillminones=[] for fillmum in range(lenzero): fillminones.append(-1) fillminones=np.array(fillminones) fillminones= np.reshape(fillminones, (1, lenzero)) maxvalueTrain=np.column_stack((maxvalueTrain,fillminones)) else: #不是学习的位置与其他神经元之间相关系数为0 maxvalueTrain = fillzeros # 将每个电极的最大值添加一行 print(num) return maxvalueTrain def main(): dataPath = r‘E:\\数据文件\3-23Data(20211126)\CIRCLE\spon22-10‘ lpPath = r‘E:\\数据文件\SPON\LN\LN26CombineCol.csv‘ # 学习神经元的位置 csvPathArr = get_csvarr(dataPath) lparr = get_learnArr(lpPath) coeffMatrix=np.zeros(4096) num=np.array(range(4095)) itArr=np.row_stack((csvPathArr,lparr)) itArr=np.row_stack((itArr,num)) itArr=np.transpose(itArr) iterable=[] for row in itArr: iterable.append(tuple(row)) iterable=iterable pool = multiprocessing.Pool(6)#核数 func=partial(Cal_first2067CrossMartix,csvPathArr,lparr) everowCoeff= pool.map(func,iterable)#获得每一行的系数 for i in range(4095): coeffMatrix=np.row_stack((coeffMatrix,everowCoeff[i])) return coeffMatrix if __name__ == "__main__": begin_time=time() coeffMatrix=main() coeffMatrix=pd.DataFrame(data=coeffMatrix) coeffMatrix.to_csv(r‘E:\\数据文件\3-23Data(20211126)\CIRCLE\spon22coeffmatrix\new2211coeffmatrix.csv‘,index=False,header=None) end_time=time() run_time=end_time-begin_time print(‘此程序运行时间:‘,run_time)
总结一下在编写程序中遇到的错误以及注意事项:
1、编写的函数如果含有几个变量和常数值,则可以使用partial函数,它的作用是将原本函数进行二次封装,变成一个新的函数,(使用格式:新函数名字=partial(初始函数名字,常量名,.......常量名)例如: func=partial(Cal_first2067CrossMartix,csvPathArr,lparr)输入的常量可以是str或者list类型
2、并行函数初始化数值的格式,这里可以是list形式,如:[1,2,3,4],如果是多变量形式可以是list里面嵌套tuple,如:[(1,2),(2,3),(3,4)]以此类推,有几个变量都可以加在tuple里面,切记刚刚使用偏函数封装了一个新的函数,在定义并行函数时,常量命名一定要写在变量之前,否则顺序会颠倒,这里使用pool.map(func,iterable)进行映射所有计算的初始值
def Cal_first2067CrossMartix(csvPathArr,lparr,threeit):#并行计算函数
func=partial(Cal_first2067CrossMartix,csvPathArr,lparr)#偏函数重新封装
everowCoeff= pool.map(func,iterable)#初始数值映射
示例1:
示例2:
# -*- coding:utf-8 -*- import time import multiprocessing def job(x ,y): """ :param x: :param y: :return: """ return x * y def job1(z): """ :param z: :return: """ return job(z[0], z[1]) if __name__ == "__main__": time1=time.time() pool = multiprocessing.Pool(2) data_list=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)] res = pool.map(job1,data_list) time2=time.time() print(res) pool.close() pool.join() print(‘总共耗时:‘ + str(time2 - time1) + ‘s‘)
参考博客:https://blog.csdn.net/u013421629/article/details/100284962
3、修改核数
pool = multiprocessing.Pool(6)#核数(根据自己电脑核数进行修改)
第二个:测试一个自发电两个电极之间相关系数的程序(可输出图片)
import pandas as pd import numpy as np from sympy import DiracDelta#导入狄拉克函 from matplotlib import pyplot as plt import os import math from time import * #测验某个spon中的多个电极 ‘‘‘ #遍历该文件夹下所有.csv文件 def get_file(path): # 创建一个空列表 files = os.listdir(path) list = [] for file in files: if not os.path.isdir(path + file): # 判断该文件是否是一个文件夹 f_name = str(file) # print(f_name) tr = ‘\\‘ # 多增加一个斜杠 filename = path + tr + f_name #filename = f_name list.append(filename)#得到所有 return list def get_sum(num1,num2,array): count=0 for n in array: if num1<=n<=num2: count=count+1 return count path=r‘E:\\数据文件\3-29Data(20211111)\SPON\SPON22‘ lPath=r‘E:\\数据文件\SPON\LN\LN11CombineCol.csv‘ ldata=pd.read_csv(lPath,header=None) plarr = np.reshape(ldata, (ldata.shape[0])) allFiles=get_file(path) for al in allFiles[:10]: ‘‘‘ ‘‘‘ def get_sum(num1,num2,array): count=0 for n in array: if num1<=n<=num2: count=count+1 return count ‘‘‘ #速度更快 def get_sum(num1,num2,array): count=0 for n in array: if num1<=n<=num2: count=count+1 if n>num2: break return count def read_csv(csvfilePath): data=pd.read_csv(csvfilePath,header=None) data=np.array(data) #data=np.round(data,3) rd, cd = data.shape csvdata = np.reshape(data, cd) return csvdata #测验两个电极 begin_time=time() pcsv1=r‘E:\\数据文件\3-23Data(20211126)\CIRCLE\spon22-10\cont_Row_01_Col_05_nr.csv‘ pcsv2=r‘E:\\数据文件\3-23Data(20211126)\CIRCLE\spon22-10\cont_Row_01_Col_12_nr.csv‘ ref=read_csv(pcsv1) maxvalueTrain = np.zeros(1) taget=read_csv(pcsv2) refspiketrain = np.zeros(50) for r in ref: rbegin = r - 0.05 # spike左边点 #rbegin = np.round(rbegin, 3) refspikeevery = [] for b in range(50): bleft = rbegin + b * 0.002 # 每个小窗的时间 bright = rbegin + (b + 1) * 0.002 # bl=list(filter(lambda x: bleft <= x <= bright, taget)) bl = get_sum(bleft, bright, taget) # 每个窗内有几个符合的 if bl == 0: refspikeevery.append(0) # 添加窗内的数字没有符合的 else: refspikeevery.append(bl) # 有符合的添加符合的长度 refspiketrain = np.row_stack((refspiketrain, refspikeevery)) refspiketrain = np.delete(refspiketrain, 0, axis=0) # 删除第一行 #saveTrain=pd.DataFrame(data=refspiketrain) #saveTrain.to_csv(r‘E:\\数据文件\3-23Data(20211126)\CIRCLE\spon22coeffmatrix\saveTrain.csv‘,index=False,header=None) a = [] for b in range(50): p = refspiketrain[:, b] # 取出每一列进行相加 a.append(sum(p)) a = np.array(a) c = math.sqrt(len(ref) * len(taget)) a = a / c maxvalue = max(a) print(maxvalue) end_time=time() run_time=end_time-begin_time print(‘此程序运行时间:‘,run_time) ‘‘‘ A=[1,2,4,5] k=0 count=0 for i in A: if 0<i<3: count=count+1 else: break k=k+1 print(count) ‘‘‘
第三个:所有神经元计算互相关程序
import pandas as pd import numpy as np from sympy import DiracDelta#导入狄拉克函 from matplotlib import pyplot as plt import os import math import multiprocessing # 导入多进程中的进程池 from functools import partial from time import * def get_sum(num1,num2,array): count=0 for n in array: if num1<=n<=num2: count=count+1 if n>num2: break return count #遍历该文件夹下所有.csv文件 def get_file(path): # 创建一个空列表 files = os.listdir(path) list = [] for file in files: if not os.path.isdir(path + file): # 判断该文件是否是一个文件夹 f_name = str(file) # print(f_name) tr = ‘\\‘ # 多增加一个斜杠 filename = path + tr + f_name #filename = f_name list.append(filename)#得到所有 return list def get_csvarr(csvPath): allFileNames = get_file(csvPath) # 获取4096个.csv larr = np.array(allFileNames) csvPathArr = np.reshape(larr, (larr.shape[0])) # 获取.csv路径数组 return csvPathArr def get_learnArr(lpPath): pdata = pd.read_csv(lpPath, header=None) plarr = np.array(pdata[1:]) # 获取学习神经元的标签 plarr = np.reshape(plarr, (plarr.shape[0])) return plarr def read_csv(csvfilePath): data=pd.read_csv(csvfilePath,header=None) data=np.array(data) data=np.round(data,3) rd, cd = data.shape csvdata = np.reshape(data, cd) return csvdata def Cal_allNeuronCrossMartix(csvPathArr,threeit):#refName代表参考电极的路径,csvPathArr代表电极csv路径,num代表电极序号,lpLable代表学习标签,lparr代表学习位置数组 refPath=threeit[0]#原始数据路径 num=int(threeit[1])#原始数据的序号 ref=read_csv(refPath) maxvalueTrain = np.zeros(1) t=0 for j in csvPathArr[:num+1]: if t!=num:#对角线之前所有为1的目标电极 taget=read_csv(j) #参考电极与目标电极做互相关 # 进行循环目标电极的每个点 refspiketrain = np.zeros(50) for r in ref: rbegin = r - 0.05 # spike左边点 refspikeevery = [] for b in range(50): bleft = rbegin + b * 0.002 # 每个小窗的时间 bright = rbegin + (b + 1) * 0.002 bl = get_sum(bleft, bright, taget) # 每个窗内有几个符合的 if bl == 0: refspikeevery.append(0) # 添加窗内的数字没有符合的 else: refspikeevery.append(bl) # 有符合的添加符合的长度 refspiketrain = np.row_stack((refspiketrain, refspikeevery)) refspiketrain = np.delete(refspiketrain, 0, axis=0) # 删除第一行 a = [] for b in range(50): p = refspiketrain[:, b] # 取出每一列进行相加 a.append(sum(p)) a = np.array(a) c = math.sqrt(len(ref) * len(taget)) a = a / c maxvalue = max(a) maxvalueTrain = np.column_stack((maxvalueTrain, maxvalue)) else: maxvalueTrain = np.column_stack((maxvalueTrain, 0)) t=t+1 row,col=maxvalueTrain.shape lenzero=4096-col fillminones=[] for fillmum in range(lenzero): fillminones.append(-1) fillminones=np.array(fillminones) fillminones= np.reshape(fillminones, (1, lenzero)) maxvalueTrain=np.column_stack((maxvalueTrain,fillminones)) print(num) return maxvalueTrain def main(): dataPath = r‘E:\\数据文件\3-23Data(20211126)\CIRCLE\SPON\spon22‘ csvPathArr = get_csvarr(dataPath) coeffMatrix=np.zeros(4096) num=np.array(range(4095)) itArr=np.row_stack((csvPathArr,num))#格式:(.csv路径,序号) itArr=np.transpose(itArr) iterable=[] for row in itArr: iterable.append(tuple(row)) #iterable=iterable[:13] pool = multiprocessing.Pool(6)#核数 func=partial(Cal_allNeuronCrossMartix,csvPathArr) everowCoeff= pool.map(func,iterable)#获得每一行的系数 for i in range(4095): coeffMatrix=np.row_stack((coeffMatrix,everowCoeff[i])) return coeffMatrix if __name__ == "__main__": begin_time=time() coeffMatrix=main() coeffMatrix=pd.DataFrame(data=coeffMatrix) coeffMatrix.to_csv(r‘E:\\数据文件\3-23Data(20211126)\CIRCLE\spon22coeffmatrix\allnew221coeffmatrix.csv‘,index=False,header=None) end_time=time() run_time=end_time-begin_time print(‘此程序运行时间:‘,run_time)
标签:pre highlight break 方便 介绍 时间 learn style csdn
原文地址:https://www.cnblogs.com/smile621sq/p/14640876.html