标签:必须 def 取数 color art 生产 port 生产者 解决
# 多进程的生产者消费者模型 # 队列 # 队列是进程安全的,同时只能有一个进程从队列中取到数据 # 生产者消费者模型 # 为什么要这个模型 # 这个模型经常性的解决数据的供需不平衡的问题 # 经常有两拨人,一拨是生产数据的,一拨是消费数据的。 # 消费者指的是使用数据处理数据的一端 # 生产数据的一端生产的数据过快 # 当生产数据过快时,消费数据过慢时,可以弄出一个进程队列,将生产的数据放入到队列中,消费数据端可以开多个进程从这个进程队列中消费数据 # 当生产数据过慢时,消费数据过快时,可以弄出一个进程队列,多几个进程作为生产者,将生产的数据放到队列中,消费者从队列中消费数据 # 生产者 消费者 模型 # 生产者,每一个生产者就是一个进程,每一个消费者就是一个进程 # 下面的例子,当生产者生产的东西全部生产完了以后,消费者进程还一直阻塞在队列那里等待数据,让消费者进程结束使用的方法是,在阻塞等待生产者执行后了后,向队列中插入了 # None值,消费者判断队列中的值为none后,则退出消费者进程。但是因为进程队列是进程安全的,多个进程一个时间点上只能有一个进程获取到队列中的数据,所以当有一个 # 消费者拿到队列中的None后,队列中就没有了None,其他消费者就拿不到None无法退出消费者进程,必须要有多个消费者就要向队列中丢多少 # 个None才能让消费者进程全部结束 # import time # import random # from multiprocessing import Process, Queue # # # def producer(name, food, q): # ‘‘‘ # 生产者 # name:谁生产 # food:生产了什么东西 # q:生产出来的东西放到哪里 # :return: # ‘‘‘ # for i in range(4): # time.sleep(random.randint(1, 3)) # 模拟生产数据的时间 # f = (‘%s 生产了%s %s‘ % (name, food, i)) # 模拟生产数据 # print(f) # 查看生产的数据 # q.put(f) # 将生产的数据放入到进程队列中 # # def consumer(q, name): # ‘‘‘ # 消费者 # q:数据在哪里 # name:谁来消费 # :return: # ‘‘‘ # while 1: # food = q.get() # if food is None: # print(‘%s 获取到了一个空‘ % name) # break # print(‘\033[31m%s 消费了 %s\033[0m‘ % (name, food)) # time.sleep(random.randint(1, 3)) # # if __name__ == ‘__main__‘: # q = Queue(20) # p = Process(target=producer, args=(‘Egon‘, ‘包子‘, q)) # p.start() # # p1 = Process(target=producer, args=(‘wusir‘, ‘泔水‘, q)) # p1.start() # # p2 = Process(target=consumer, args=(q, ‘why‘)) # p2.start() # # p3 = Process(target=consumer, args=(q, ‘Jbox‘)) # p3.start() # # p.join() # 阻塞等待生产者1进程结束 # p1.join() # 阻塞等待生产者2进程结束 # # q.put(None) # q.put(None) # 下面的例子使用JoinableQueue进程队列实现生产者消费者模型,放入数据到队列中时,内部多了一个计数+的机制,从队列中取数据后,调用队列的task_done方法,将计数-1 # 然后在生产者中使用队列的join阻塞等待队列的数据全部为空,全部被消费者消费掉,如果数据全被消耗掉了,生产者进程才会结束 import time import random from multiprocessing import Process, JoinableQueue def producer(name, food, q): ‘‘‘ 生产者 name:谁生产 food:生产了什么东西 q:生产出来的东西放到哪里 :return: ‘‘‘ for i in range(4): time.sleep(random.randint(1, 3)) # 模拟生产数据的时间 f = (‘%s 生产了%s %s‘ % (name, food, i)) # 模拟生产数据 print(f) # 查看生产的数据 q.put(f) # 将生产的数据放入到进程队列中,每放入一个数据到队列中后,机制中的count就会被+1 q.join() # 阻塞等待队列中的数据全部被取走,q队列中的数据全部被取走则不再阻塞在这里 def consumer(q, name): ‘‘‘ 消费者 q:数据在哪里 name:谁来消费 :return: ‘‘‘ while 1: food = q.get() if food is None: print(‘%s 获取到了一个空‘ % name) break print(‘\033[31m%s 消费了 %s\033[0m‘ % (name, food)) time.sleep(random.randint(1, 3)) q.task_done() # 每从队列中消费了一个数据后,让机制中的conut计数减一 if __name__ == ‘__main__‘: q = JoinableQueue(20) p = Process(target=producer, args=(‘Egon‘, ‘包子‘, q)) p.start() p1 = Process(target=producer, args=(‘wusir‘, ‘泔水‘, q)) p1.start() p2 = Process(target=consumer, args=(q, ‘why‘)) p2.daemon = True # 设置为守护进程的目的是为了让主进程的代码执行完毕后,该守护进程的代码自动结束 p2.start() p3 = Process(target=consumer, args=(q, ‘Jbox‘)) p3.daemon = True # 设置为守护进程的目的是为了让主进程的代码执行完毕后,该守护进程的代码自动结束 p3.start() p.join() # 阻塞等待生产者1进程结束 p1.join() # 阻塞等待生产者2进程结束 print(‘生产者进程结束,同时主进程要结束,因为其他两个消费者进程被设置为守护进程,所以主进程结束后,那两个消费者守护进程也会结束‘)
标签:必须 def 取数 color art 生产 port 生产者 解决
原文地址:https://www.cnblogs.com/whylinux/p/9824705.html