标签:
__author__ = ‘alex‘ #coding:utf-8 from multiprocessing import Process def foo(i): print (i) if __name__ == ‘__main__‘: for i in range(10): p = Process(target=foo,args=(i,)) p.start()
进程的数据默认不共享
#coding:utf-8 from multiprocessing import Process def foo(i,li): li.append(i) print (‘li is ‘,li) # print (i) li = [] if __name__ == ‘__main__‘: for i in range(10): p = Process(target=foo,args=(i,li)) p.start()
输出结果:
li is [1] li is [0] li is [3] li is [2] li is [5] li is [7] li is [4] li is [6] li is [8] li is [9]
这个跟我们设想要的结果不一致,我们需要的是for循环中的数据一个个的被append到列表中去,这就涉及到了进程间的通信。
#coding:utf-8 from multiprocessing import Process from multiprocessing import queues import multiprocessing import time def foo(i,q): time.sleep(1) q.put(i) # time.sleep(1) print (‘qsize is ‘,q.qsize()) # print (i) q = queues.Queue(ctx=multiprocessing) if __name__ == ‘__main__‘: for i in range(10): p = Process(target=foo,args=(i,q)) p.start()
1,使用multiprocessing模块中的queues类,可以实现进程间的通信,输出结果:
qsize is 1 qsize is 2 qsize is 3 qsize is 4 qsize is 5 qsize is 6 qsize is 7 qsize is 8 qsize is 10 qsize is 10
2,也可以利用python的Array来实现,Array较之list的不同在于,数组的元素位置都是在一起的,大小是事先分配好的。
#coding:utf-8 from multiprocessing import Process from multiprocessing import queues import multiprocessing import time from multiprocessing import Array def foo(i,arr): time.sleep(2) arr[i] = i # time.sleep(1) # print (‘qsize is ‘,q.qsize()) # print (i) for item in arr: print (item) print ("===================") arr = Array(‘i‘,10) if __name__ == ‘__main__‘: for i in range(10): p = Process(target=foo,args=(i,arr)) p.start()
执行最终结果是:
0
1
2
3
4
5
6
7
8
9
但是因为线程之间无法确定哪个线程先执行完的(CPU的调度是随机的),所以刚开始的数据可能是无序的。
3,也可以使用一个特殊的dict()类来实现,但是下面的代码执行有问题,因为是用fork()的机制来产生进程的,所以在windows下面不支持,但是在Linux和MAC中是可以支持的,未测试!
进程间的通信使用的也是socket的方式来实现的,在下面的代码中,main函数里面sleep(2)了2秒,原因是主进程和子进程在通信过程中,如果子进程还没有执行完,但是子进程已经执行完退出了,那么子进程将不能进行消息的收发了,因而不能实现进程间通信了,如果把sleep的时间改到0.1秒,那么可以看到的结果是,有一部分的数据执行出来了,剩余的程序未执行完就报错了。
#coding:utf-8
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
import time
from multiprocessing import Array
from multiprocessing import Manager
manage = Manager()
dic = manage.dict()
def foo(i,dic):
dic[i] = i +100
print (dic.values())
if __name__ == ‘__main__‘:
for i in range(10):
p = Process(target=foo,args=(i,dic))
p.start()
time.sleep(2)
进程跟线程一样,操作同一份数据的时候也需要加锁:
from multiprocessing import Process,Array import time arr = Array(‘i‘,1) arr[0] = 10 def foo(arr): arr[0] -= 1 time.sleep(1) print (arr[0]) if __name__ == ‘__main__‘: for i in range(10): p = Process(target=foo,args=(arr,)) p.start()
因为添加了sleep(1),所以每次执行循环的时候,每个子进程都执行了一遍减一的操作,最终所有的输出结果都是0
0
0
0
0
0
0
0
0
0
0
因此,我们需要给进程加锁:
from multiprocessing import Process,Array import time from multiprocessing import RLock lk = RLock() arr = Array(‘i‘,1) arr[0] = 10 def foo(arr,lc): lc.acquire() arr[0] -= 1 time.sleep(1) print (arr[0]) lc.release() if __name__ == ‘__main__‘: for i in range(10): p = Process(target=foo,args=(arr,lk)) p.start()
执行结果:
9
8
7
6
5
4
3
2
1
0
进程锁:
from multiprocessing import Pool from multiprocessing import Process import time def foo(arg): time.sleep(1) print (arg) if __name__ == ‘__main__‘: p = Pool(5) for i in range(10): p.apply(func=foo,args=(i,))
执行结果,每隔一秒打印一个数值:
0
1
2
3
4
5
6
7
8
9
改成另外的写法:
from multiprocessing import Pool from multiprocessing import Process import time def foo(arg): time.sleep(1) print (arg) if __name__ == ‘__main__‘: p = Pool(5) for i in range(10): p.apply_async(func=foo,args=(i,)) p.close() p.join()
在执行join方法之前必须先执行close()或者terminate()方法。
执行结果:
0
1
2
3
4
5
6
7
8
9
如果用terminate方法改写:
from multiprocessing import Pool from multiprocessing import Process import time def foo(arg): time.sleep(1) print (arg) if __name__ == ‘__main__‘: p = Pool(5) for i in range(10): p.apply_async(func=foo,args=(i,)) time.sleep(2) p.terminate() # p.close() p.join()
执行结果:
0
1
2
3
4
terminate表示当前的子进程执行完毕就不执行了,close表示所有的进程都执行完毕之后才执行主进程。
标签:
原文地址:http://www.cnblogs.com/python-study/p/5832006.html