multiprocessing模块是Python提供的用于多进程开发的包,multiprocessing包提供本地和远程两种并发,通过使用子进程而非线程有效地回避了全局解释器锁。
(一)创建进程Process 类
创建进程的类,其源码在multiprocessing包的process.py里,有兴趣的可以对照着源码边理解边学习。它的用法同threading.Thread差不多,从它的类定义上就可以看的出来,如下:
class Process(object): ‘‘‘ Process objects represent activity that is run in a separate process The class is analagous to `threading.Thread` ‘‘‘ _Popen = None def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): assert group is None, ‘group argument must be None for now‘ count = _current_process._counter.next() self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey self._daemonic = _current_process._daemonic self._tempdir = _current_process._tempdir self._parent_pid = os.getpid() self._popen = None self._target = target self._args = tuple(args) self._kwargs = dict(kwargs) self._name = name or type(self).__name__ + ‘-‘ + ‘:‘.join(str(i) for i in self._identity)
Process([group [, target [, name [, args [, kwargs]]]]])
group实质上不使用,是保留项,便于以后扩展。
target表示调用对象,
args表示调用对象的位置参数元组
kwargs表示调用对象的字典
name为别名,即进程的名字
它的方法/属性跟threading.Thread也有很多类似的地方,主要有:
start():开始进程活动。
run():表示进程的活动方法,可以在子类中覆盖它。
join([timeout]):是用来阻塞当前上下文,直至该进程运行结束,一个进程可以被join()多次,timeout单位是秒。
terminate():结束进程。在Unix上使用的是SIGTERM,在Windows平台上使用TerminateProcess
is_alive():判断进程是否还活着。
name:一个字符串,表示进程的名字,也可以通过赋值语句利用它来修改进程的名字
ident:进程的ID,如果进程没开始,结果是None
pid:同ident,大家可以看看ident和pid的实现,是利用了os模块的getpid()方法。
authkey:设置/获取进程的授权密码。当初始化多进程时,使用os.urandom()为主进程分配一个随机字符串。当创建一个Process对象时,它将继承其父进程的认证密钥, 但是可以通过将authkey设置为另一个字节字符串来改变。这里authkey为什么既可以设置授权密码又可以获取呢?那是因为它的定义使用了property装饰器,源码如下:
@property def authkey(self): return self._authkey @authkey.setter def authkey(self, authkey): ‘‘‘ Set authorization key of process ‘‘‘ self._authkey = AuthenticationString(authkey)
这是property的一个高级用法,如果理解了其实也很简单,有兴趣的去查看其它资料。
daemon:一个布尔值,指示进程是(True)否(False)是一个守护进程。它必须在调用start()之前设置,否则会引发RuntimeError。它的初始值继承自创建它的进程;进程不是一个守护进程,所以在进程中创建的所有进程默认daemon = False。
exitcode:返回进程退出时的代码。进程运行时其值为None,如果为–N,表示被信号N结束。
(1)一个简单的单进程例子
#coding=utf-8 import multiprocessing import datetime import time def worker(interval): n = 5 while n > 0: print "The now is %s"% datetime.datetime.now() time.sleep(interval) n -= 1 if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start()#开始进程 #p.terminate()#结束进程 #p.join(9)#阻塞当前上下文 print "p.authkey",p.authkey#获取进程的授权密码 p.authkey = u"123"#设置进程的授权密码 print "p.authkey", p.authkey#获取进程的授权密码 print "p.pid:", p.pid,p.ident#进程ID p.name = ‘helloworld‘#修改进程名字 print "p.name:", p.name#进程名字 print "p.is_alive:", p.is_alive()#是否是活的
运行结果如下图:
上面的代码有两行注释掉的,大家可以把注释去掉,体会、理解这两个方法的用处,在此不贴我的运行结果了。
(2)自定义进程类,并开启多个进程
import multiprocessing import datetime import time class MyProcess(multiprocessing.Process): """ 自定义进程类 """ def __init__(self,interval,group=None,target=None,name=None,args=(),kwargs={}): multiprocessing.Process.__init__(self,group,target,name,args,kwargs=kwargs) self.interval = interval def run(self): n = 5 while n > 0: print("the time is %s"%datetime.datetime.now()) time.sleep(self.interval) n -= 1 def worker_1(interval): print "worker_1" time.sleep(interval) print "end worker_1" def worker_2(interval): print "worker_2" time.sleep(interval) print "end worker_2" def worker_3(interval): print "worker_3" time.sleep(interval) print "end worker_3" if __name__ == "__main__": p1 = MyProcess(interval=2,target = worker_1, args = (2,)) p2 = MyProcess(interval=2,target = worker_2, args = (3,)) p3 = MyProcess(interval=2,target = worker_3, args = (4,)) p1.start() p2.start() p3.start() print "current process",multiprocessing.current_process(),multiprocessing.active_children() print("The number of CPU is:" + str(multiprocessing.cpu_count())) for p in multiprocessing.active_children(): print("child p.name:" + p.name + "\tp.id" + str(p.pid)) print "END!!!!!!!!!!!!!!!!!"
运行结果如下:
看看打印出来的时间,三个进程应该是并行执行的。
(二)进程间通信
multiprocessing模块支持两种进程间的通信方式:Queue(队列)和Pipe(管道)。
(1)Queue
multiprocessing中的Queue类的定义在queues.py文件里。和Queue.Queue差不多,multiprocessing中的Queue类实现了Queue.Queue的大部分方法(可以参考上篇博文Python:线程、进程与协程(3)——Queue模块及源码分析),但task_done()和join()没有实现,主要方法和属性有
qsize():返回Queue的大小
empty():返回一个布尔值,表示Queue是否为空
full():返回一个布尔值,表示Queue是否满
put(item[, block[, timeout]]):向队列里添加元素item,block设置为False的时候,如果队列满了则抛出Full异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Full异常。
put_nowait(item):等价与put(item,False)。
get([block[, timeout]]):从队列中删除元素并返回该元素的值,如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用的项目,则引发Empty异常。
get_nowait():等价于get(False)
close():表示该Queue不在加入新的元素
join_thread():加入后台线程。这只能在调用close()之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。默认情况下,如果进程不是队列的创建者,则退出, 它将尝试加入队列的后台线程。 该进程可以调用cancel_join_thread()来做
cancel_join_thread():在阻塞中阻止join_thread(),防止后台线程在进程退出时被自动连接 ,肯能会导致数据丢失。
(2)Pipe
Pipe不是类,是函数,该函数定义在 multiprocessing中的connection.py里,函数原型Pipe(duplex=True),
返回一对通过管道连接的连接对象conn1和conn2。
如果duplex是True(默认值),则管道是双向的。
如果duplex是False,则管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息。
Pipe()返回的两个连接对象表示管道的两端,每个连接对象都有send()和recv()方法(还有其它方法),分别是发送和接受消息。下面举个简单的例子,一个发送数据,一个接受数据
#coding=utf-8 import multiprocessing import time def proc1(pipe): """ 发送数据 """ while True: for i in xrange(100): print "send: %s" %(i) pipe.send(i)#发送数据 time.sleep(1) def proc2(pipe): """ 接收数据 """ while True: print "proc2 rev:", pipe.recv()#接受数据 time.sleep(1) if __name__ == "__main__": pipe1,pipe2 = multiprocessing.Pipe()#返回两个连接对象 p1 = multiprocessing.Process(target=proc1, args=(pipe1,)) p2 = multiprocessing.Process(target=proc2, args=(pipe2,)) p1.start() p2.start() p1.join() p2.join()
运行结果如下:
(三)进程间的同步
multiprocessing包含与threading中所有同步原语等同的原语,它也有Lock,RLock,Even,Condition,Semaphore,BoundedSemaphore。用法都差不多,它们的定义在 multiprocessing包的synchronize.py文件里,在此不过多介绍,有兴趣的可以参考Python:线程、进程与协程(2)——threading模块里相关的概念理解。如果理解了相关概念,在 multiprocessing模块中使用是一样的,看下面这个简单的例子吧,有两个进程要向某个文件中写入内容,为了避免访问冲突,可以使用锁。
#coding=utf-8 import multiprocessing def worker_with(lock, f): with lock:#Lock等对象也是支持上下文管理器协议的。 fs = open(f, ‘a+‘) n = 10 while n > 1: fs.write("Lockd acquired via with\n") n -= 1 fs.close() def worker_no_with(lock, f): lock.acquire() try: fs = open(f, ‘a+‘) n = 10 while n > 1: fs.write("Lock acquired directly\n") n -= 1 fs.close() finally: lock.release() if __name__ == "__main__": lock = multiprocessing.Lock()#定义锁 f = "/home/liulonghua/files.txt" w = multiprocessing.Process(target = worker_with, args=(lock, f)) nw = multiprocessing.Process(target = worker_no_with, args=(lock, f)) w.start() nw.start() print "end"
multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。下篇博文再接着讲进程共享和进程池等。
Python:线程、进程与协程(4)——multiprocessing模块(1)
原文地址:http://11026142.blog.51cto.com/11016142/1873260