标签:期望 .com lin odi sem callback blog rom read
一 多进程编程
Python实现多进程的方式有两种:一种方法是os模块中的fork方法,另一种是使用multiprocessing模块。
前者仅适用于LINUX/UNIX操作系统,对Windows不支持,后者则是跨平台的实现方式。
第一种方式:使用os模块中的fork方式实现多进程
import os if __name__ == ‘__main__‘: print ‘current Process (%s) start ...‘%(os.getpid()) pid = os.fork() if pid < 0: print ‘error in fork‘ elif pid == 0: print ‘I am child process(%s) and my parent process is (%s)‘,(os.getpid(),os.getppid()) else: print ‘I(%s) created a chlid process (%s).‘,(os.getpid(),pid)
第二种方式:multiprocessing
由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
Process.PID中保存有PID,如果进程还没有start(),则PID为None。
window系统下,需要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。join()方法实现进程间的同步。
#__author: greg #date: 2017/9/19 23:52 from multiprocessing import Process import time def f(name): time.sleep(1) print(‘hello‘, name,time.ctime()) if __name__ == ‘__main__‘: p_list=[] for i in range(3): p = Process(target=f, args=(‘alvin‘,)) p_list.append(p) p.start() for i in p_list: i.join() print(‘end‘)#一个主进程,三个子进程 # output: # hello alvin Fri Nov 24 19:10:08 2017 # hello alvin Fri Nov 24 19:10:08 2017 # hello alvin Fri Nov 24 19:10:08 2017 # end
类式调用:
#__author: greg #date: 2017/9/21 20:02 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print (‘hello‘, self.name,time.ctime())
if __name__ == ‘__main__‘: p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print(‘end‘) #output: # hello MyProcess-1 Fri Nov 24 19:12:17 2017 # hello MyProcess-2 Fri Nov 24 19:12:17 2017 # hello MyProcess-3 Fri Nov 24 19:12:17 2017 # end
显示进程ID号:
#__author: greg #date: 2017/9/21 20:16 from multiprocessing import Process import os import time def info(title): print(title) print(‘module name:‘, __name__) print(‘parent process:‘, os.getppid())#父进程号 print(‘process id:‘, os.getpid())#进程号 def f(name): info(‘\033[31;1mfunction f\033[0m‘) print(‘hello‘, name) if __name__ == ‘__main__‘: info(‘\033[32;1mmain process line\033[0m‘) time.sleep(10) p = Process(target=info, args=(‘bob‘,)) p.start() p.join() #output: # main process line # module name: __main__ # parent process: 1548 pycharm的进程号 # process id: 8416 Python进程号 # bob # module name: __mp_main__ # parent process: 8416 Python进程号 # process id: 5556 info进程号
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
authkey
daemon:和线程的setDeamon功能一样
exitcode(进程在运行时为None、如果为–N,表示被信号N结束)
name:进程名字。
pid:进程号。
三 进程间通讯
不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:
Queues 用来在多个进程间通信:
1. 阻塞模式
import queue import time q = queue.Queue(10) #创建一个队列 start=time.time() for i in range(10): q.put(‘A‘) time.sleep(0.5) end=time.time() print(end-start)
这是一段极其简单的代码(另有两个线程也在操作队列q),我期望每隔0.5秒写一个‘A‘到队列中,但总是不能如愿:
间隔时间有时会远远超过0.5秒。
原来,Queue.put()默认有 block = True 和 timeout两个参数。
源码:def put(self, item, block=True, timeout=None):
当 block = True 时,写入是阻塞式的,阻塞时间由 timeout确定。
当队列q被(其他线程)写满后,这段代码就会阻塞,直至其他线程取走数据。
Queue.put()方法加上 block=False 的参数,即可解决这个隐蔽的问题。
但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常。
#__author: greg #date: 2017/9/21 22:27 from multiprocessing import Process, Queue def f(q,n): q.put([42, n, ‘hello‘]) print(‘subprocess id‘,id(q)) if __name__ == ‘__main__‘: q = Queue() p_list=[] print(‘process id‘,id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p_list.append(p) p.start() print(q.get()) print(q.get()) print(q.get()) for i in p_list: i.join() # output # process id 2284856854176 # subprocess id 2607348001872 # [42, 0, ‘hello‘] # subprocess id 1712786975824 # [42, 2, ‘hello‘] # subprocess id 2254764977120 # [42, 1, ‘hello‘]
Pipe常用来两个进程间进行通信,两个进程分别位于管道的两端
def f(conn): conn.send([42, None, ‘hello‘]) conn.close() if __name__ == ‘__main__‘: parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, ‘hello‘]" p.join()
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
#__author: greg #date: 2017/9/21 22:57 import multiprocessing import random import time,os def proc_send(pipe,urls): for url in urls: print("Process(%s) send: %s" %(os.getpid(),url)) pipe.send(url) time.sleep(random.random()) def proc_recv(pipe): while True: print("Process(%s) rev:%s" %(os.getpid(),pipe.recv())) time.sleep(random.random()) if __name__=="__main__": pipe=multiprocessing.Pipe() p1=multiprocessing.Process(target=proc_send,args=(pipe[0],[‘url_‘+str(i) for i in range(10)])) p2=multiprocessing.Process(target=proc_recv,args=(pipe[1],)) p1.start() p2.start() p1.join() p2.terminate()
Manager()返回的管理器对象控制一个服务器进程,该进程持有Python对象,并允许其他进程使用代理来操纵它们。
#__author: greg #date: 2017/9/21 23:10 from multiprocessing import Process, Manager def f(d, l,n): d[n] = ‘1‘ d[‘2‘] = 2 d[0.25] = None l.append(n) # print(l) if __name__ == ‘__main__‘: with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
四 进程同步
当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。
#__author: greg #date: 2017/9/21 23:25 from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print(‘hello world‘, i) finally: l.release() if __name__ == ‘__main__‘: lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
五 进程池 Pool类
Pool可以提供指定数量的进程供用户使用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程来执行该请求
但如果池中的进程数已经达到规定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它。
# -*- coding: utf-8 -*- # 2017/11/24 20:15 from multiprocessing import Pool import os, time, random def run_task(name): print(‘Task %s (pid = %s) is running...‘ % (name, os.getpid())) time.sleep(random.random() * 3) print(‘Task %s end.‘ % name) if __name__==‘__main__‘: print(‘Current process %s.‘ % os.getpid()) p = Pool(processes=3) for i in range(5): p.apply_async(run_task, args=(i,)) print(‘Waiting for all subprocesses done...‘) p.close() p.join() print(‘All subprocesses done.‘) """ Current process 9788. Waiting for all subprocesses done... Task 0 (pid = 5916) is running... Task 1 (pid = 3740) is running... Task 2 (pid = 6964) is running... Task 2 end. Task 3 (pid = 6964) is running... Task 1 end. Task 4 (pid = 3740) is running... Task 0 end. Task 3 end. Task 4 end. All subprocesses done. """
每次最多运行3个进程,当一个任务结束了,新的任务依次添加进来,任务执行使用的进程依然是原来的进程,这一点通过进程的pid可以看出来。
标签:期望 .com lin odi sem callback blog rom read
原文地址:http://www.cnblogs.com/gregoryli/p/7892222.html