标签:producer als empty 完数 lock roc imp port 汇报
和开源软件MQ 很相似
先进先出的逻辑,一个,N个噻, 1个,N个听Q
from multiprocessing import Process,Queue
#队列,先进先出
q=Queue(3)
q.put({‘a‘:1})
q.put(‘b‘)
q.put(‘c‘)
print(q.full()) -->是否 噻满
q.put(‘d‘,False) #等同于q.put_nowait(‘d‘) 塞满后 再 噻,不 等待,直接报错
q.put(‘d‘,timeout=2) #塞满后 再噻 等待2秒, 不能噻就报错
print(q.qsize()) #查看q中 值的个数
print(q.get())
print(q.get())
print(q.get())
print(q.empty()) -->是否取空
print(q.get(block=False)) #等同于q.get_nowait(‘d‘) 听满后 再听,不 等待,直接报错
print(q.get_nowait())
print(q.get(timeout=2)) #听满后 再听 等待2秒, 不能听就报错
==========================模拟真实案例======================
吃包子, 厨师做,客户吃,相互 不干扰
def GET(q,name,li):
print(‘客户‘, name)
for line in li:
time.sleep(1)
print(‘客户吃了‘,q.get())
def PUT(q,name,li):
print(‘厨师‘,name)
for line in li:
time.sleep(1)
print(‘%s生产出%s‘ %(name,line))
q.put(line)
if __name__ == ‘__main__‘:
q=Queue()
li = [‘包子%s‘ % i for i in range(10)]
p=Process(target=PUT,args=(q,‘la‘,li))
p.start()
p1=Process(target=GET,args=(q,‘onda‘,li))
p1.start()
但是问题是,客户吃完后 不释放进程,还在卡住
===============================通过 put 提交一个 固定的值,让 get去 判断======================
def GET(q,name):
while True:
time.sleep(2)
res=q.get()
if res is None :break
print(‘客户%s, 吃了%s‘ %(name,res))
def PUT(q,name,li):
for line in li:
time.sleep(1)
q.put(line)
print(‘厨师 %s 只做了%s‘ %(name,line))
q.put(None)
if __name__ == ‘__main__‘:
q=Queue()
p1=Process(target=GET,args=(q,‘la‘))
p1.start()
li=[‘包子%s‘ %i for i in range(10)]
p2=Process(target=PUT,args=(q,‘onda‘,li))
p2.start()
p1.join()
p2.join()
print(‘主进程‘)
==============================或者使用 JoinableQueue===========
from multiprocessing import Process,JoinableQueue
import time
import random
def consumer(q,name):
while True:
# time.sleep(random.randint(1,3))
res=q.get()
q.task_done()
print(‘\033[41m消费者%s拿到了%s\033[0m‘ %(name,res))
def producer(seq,q,name):
for item in seq:
# time.sleep(random.randint(1,3))
q.put(item)
print(‘\033[42m生产者%s生产了%s\033[0m‘ %(name,item))
q.join()
print(‘============>>‘)
if __name__ == ‘__main__‘:
q=JoinableQueue()
c=Process(target=consumer,args=(q,‘egon‘),)
c.daemon=True #设置守护进程,主进程结束c就结束
c.start()
seq=[‘包子%s‘ %i for i in range(10)]
p=Process(target=producer,args=(seq,q,‘厨师1‘))
p.start()
# master--->producer----->q--->consumer(10次task_done)
p.join() #主进程等待p结束,p等待c把数据都取完,c一旦取完数据,p.join就是不再阻塞,进
# 而主进程结束,主进程结束会回收守护进程c,而且c此时也没有存在的必要了
print(‘主进程‘)
get()中
q.task_done() 每吃完一个,都会 汇报一下,直到吃完,
put()中
q.join(),等待 get()吃完, put就可以 获取到,然后向下 进行,
而此时 put() 还卡在 听q
get()
所以给 get()设置 守护进程, 在主进程释放后,将 get释放
主进程:
在让主进程通过 p.join() 等待 put()请求 结束 ,而put() 请求 会等get() 吃完, 最终 主进程才结束,然后 释放 get()
标签:producer als empty 完数 lock roc imp port 汇报
原文地址:http://www.cnblogs.com/onda/p/7083176.html