通常,进程之间彼此是完全孤立的,唯一的通信方式是队列或管道。但可以使用两个对象来表示共享数据。其实,这些对象使用了共享内存(通过mmap模块)使访问多个进程成为可能。
Value( typecode, arg1, … argN, lock )
在共享内容中常见ctypes对象。typecode要么是包含array模块使用的相同类型代码(如’i’,’d’等)的字符串,要么是来自ctypes模块的类型对象(如ctypes.c_int、ctypes.c_double等)。所有额外的位置参数arg1, arg2 ….. argN将传递给指定类型的构造函数。lock是只能使用关键字调用的参数,如果把它置为True(默认值),将创建一个新的锁定来包含对值的访问。如果传入一个现有锁定,比如Lock或RLock实例,该锁定将用于进行同步。如果v是Value创建的共享值的实例,便可使用v.value访问底层的值。例如,读取v.value将获取值,而赋值v.value将修改值。
RawValue( typecode, arg1, … ,argN)
同Value对象,但不存在锁定。
Array( typecode, initializer, lock )
在共享内存中创建ctypes数组。typecode描述了数组的内容,意义与Value()函数中的相同。initializer要么是设置数组初始大小的整数,要么是项目序列,其值和大小用于初始化数组。lock是只能使用关键字调用的参数,意义与Value()函数中相同。如果a是Array创建的共享数组的实例,便可使用标准的python索引、切片和迭代操作访问它的内容,其中每种操作均由锁定进行同步。对于字节字符串,a还具有a.value属性,可以吧整个数组当做一个字符串进行访问。
RawArray(typecode, initializer )
同Array对象,但不存在锁定。当所编写的程序必须一次性操作大量的数组项时,如果同时使用这种数据类型和用于同步的单独锁定(如果需要的话),性能将得到极大的提升。
除了使用Value()和Array()创建的共享值之外,multiprocessing模块还提供一下同步源于的共享版本。
![这里写图片描述](http://img.blog.csdn.net/20150830164730833)
这些对象的行为与threading模块中定义的名称相同的同步原语相似。请参考threading文档了解更多细节。
应该注意,使用多进程后,通常不必再担心与锁定、信号量或类似构造的底层同步,这一点与线程不相伯仲。在某种程度上,管道上的send()和receive()操作,以及队列上的put()和get()操作已经提供了同步功能。但是,在某写特定的设置下还是需要用到共享值和锁定。下面这个例子说明了如何使用共享数组代替管道,将一个浮点数的python列表发送给另一个进程:
import multiprocessing
class FloatChannel(object):
def __init__(self,maxsize):
self.buffer=multiprocessing.RawArray(‘d‘,maxsize)
self.buffer_len=multiprocessing.Value(‘i‘)
self.empty=multiprocessing.Semaphore(1)
self.full=multiprocessing.Semaphore(0)
def send(self,values):
self.empty.acquire() #只在缓存为空时继续
nitems=len(values)
self.buffer_len=nitems #设置缓冲区大小
self.buffer[:nitems]=values #将复制到缓冲区中
self.full.release() #发信号通知缓冲区已满
def recv(self):
self.full.acquire() #只在缓冲区已满时继续
values=self.buffer[:self.buffer_len.value] #复制值
self.empty.release() #发送信号通知缓冲区为空
return values
#性能测试 接收多条消息
def consume_test(count,ch):
for i in xrange(count):
values=ch.recv()
#性能测试 发送多条消息
def produce_test(count,values,ch):
for i in xrange(count):
ch.send(values)
if __name__=="__main__":
ch=FloatChannel(100000)
p=multiprocessing.Process(target=consume_test,args=(1000,ch))
p.start()
values=[float(x) for x in xrange(100000)]
produce_test(1000,values,ch)
print "Done"
p.join()
在我的计算机上执行性能测试时,通过FloatChannel发送一个较大的浮点数列表,速度比通过Pipe发送快大约80%,因为后者必须对所有值进行序列化和反序列化。
版权声明:本文为博主原创文章,未经博主允许不得转载。
multiprocessing在python中的高级应用-共享数据与同步
原文地址:http://blog.csdn.net/winterto1990/article/details/48106505