标签:one 这一 队列 star ipc start col nbsp 结束
生产者+消费者模型
从网上爬取数据
从网页上获取数据的过程 --》 生产数据的过程,爬取网页,是生产者行为
把数据取回来进行分析得出结果 --》数据消费过程,是消费者行为
使用队列来完成生产、消费的过程
生产者,是进程
消费者,是进程
生产者与消费者之间,传递数据,需要一个盘子(IPC)
# 没设置时间延迟的情况下 from multiprocessing import Queue, Process def consumer(name, q): while 1: food = q.get() # 会阻塞,直到队列里有数据就取出来 print("%s把包子%s号给吃了" % (name, food)) def producer(q, food_name): for i in range(10): food = "%s%s" % (food_name, i) print("我做了%s号" % food) q.put(food) if __name__ == "__main__": q = Queue() p = Process(target=consumer, args=("alex", q)) p1 = Process(target=producer, args=(q, "包子")) p.start() p1.start() # 我做了包子0号 # 我做了包子1号 # 我做了包子2号 # 我做了包子3号 # 我做了包子4号 # alex把包子包子0号给吃了 # 我做了包子5号 # 我做了包子6号 # 我做了包子7号 # 我做了包子8号 # 我做了包子9号 # alex把包子包子1号给吃了 # alex把包子包子2号给吃了 # alex把包子包子3号给吃了 # alex把包子包子4号给吃了 # alex把包子包子5号给吃了 # alex把包子包子6号给吃了 # alex把包子包子7号给吃了 # alex把包子包子8号给吃了 # alex把包子包子9号给吃了 # 运行发现明显生产比消费更快,供大于求,注意程序并没有停止运行
# 设置时间延迟后 from multiprocessing import Queue, Process import time def consumer(name, q): while 1: food = q.get() # 会阻塞,直到队列里有数据就取出来 time.sleep(1) print("%s把包子%s号给吃了" % (name, food)) def producer(q, food_name): for i in range(10): time.sleep(0.5) food = "%s%s" % (food_name, i) print("我做了%s号" % food) q.put(food) if __name__ == "__main__": q = Queue() p = Process(target=consumer, args=("alex", q)) p1 = Process(target=producer, args=(q, "包子")) p.start() p1.start() # 我做了包子0号 # 我做了包子1号 # 我做了包子2号 # alex把包子包子0号给吃了 # 我做了包子3号 # alex把包子包子1号给吃了 # 我做了包子4号 # 我做了包子5号 # alex把包子包子2号给吃了 # 我做了包子6号 # 我做了包子7号 # alex把包子包子3号给吃了 # 我做了包子8号 # 我做了包子9号 # alex把包子包子4号给吃了 # alex把包子包子5号给吃了 # alex把包子包子6号给吃了 # alex把包子包子7号给吃了 # alex把包子包子8号给吃了 # alex把包子包子9号给吃了 # 运行发现,虽然设置了延迟,但是生产(producer)的延迟比消费(consumer)的延迟更短 # 所以还是供大于求
# 修改生产者与消费者的延迟设置 from multiprocessing import Queue, Process import time def consumer(name, q): while 1: food = q.get() # 会阻塞,直到队列里有数据就取出来 time.sleep(0.5) print("%s把包子%s号给吃了" % (name, food)) def producer(q, food_name): for i in range(10): time.sleep(1) food = "%s%s" % (food_name, i) print("我做了%s号" % food) q.put(food) if __name__ == "__main__": q = Queue() p = Process(target=consumer, args=("alex", q)) p1 = Process(target=producer, args=(q, "包子")) p.start() p1.start() # 我做了包子0号 # alex把包子包子0号给吃了 # 我做了包子1号 # alex把包子包子1号给吃了 # 我做了包子2号 # alex把包子包子2号给吃了 # 我做了包子3号 # alex把包子包子3号给吃了 # 我做了包子4号 # alex把包子包子4号给吃了 # 我做了包子5号 # alex把包子包子5号给吃了 # 我做了包子6号 # alex把包子包子6号给吃了 # 我做了包子7号 # alex把包子包子7号给吃了 # 我做了包子8号 # alex把包子包子8号给吃了 # 我做了包子9号 # alex把包子包子9号给吃了 # 很明显,在只有一个消费者和一个生产者的前提下,如果生产的时间要比消费的时间更长 # 那么每生产一个包子,消费者都能立刻吃掉,并且等着下一个生产好的包子
# 基于队列实现生产者消费者模型 from multiprocessing import Queue, Process import time def consumer(name, q): while 1: food = q.get() # 会阻塞,直到队列里有数据就取出来 time.sleep(2) print("%s拿到了包子%s号后并把它吃掉" % (name, food)) def producer(q, food_name): for i in range(20): # 生产20个包子 time.sleep(0.1) food = "%s%s" % (food_name, i) print("我做了%s号" % food) q.put(food) # 如果队列满了就在这里阻塞,直到队列里有空位置 if __name__ == "__main__": q = Queue(5) # 这个队列的容量,这里是指 6个的容量 p = Process(target=consumer, args=("alex", q)) # 消费者1号 p1 = Process(target=consumer, args=("太白", q)) # 消费者2号 p2 = Process(target=producer, args=(q, "包子")) # 生产者1号 p.start() p1.start() p2.start() # 我做了包子0号 # 我做了包子1号 # 我做了包子2号 # 我做了包子3号 # 我做了包子4号 # 我做了包子5号 # 我做了包子6号 # 我做了包子7号 # alex拿到包子包子0号并把它吃掉 # 太白拿到包子包子1号并把它吃掉 # 我做了包子8号 # 我做了包子9号 # alex拿到包子包子2号并把它吃掉 # 我做了包子10号 # 太白拿到包子包子3号并把它吃掉 # 我做了包子11号 # alex拿到包子包子4号并把它吃掉 # 太白拿到包子包子5号并把它吃掉 # 我做了包子12号 # 我做了包子13号 # alex拿到包子包子6号并把它吃掉 # 我做了包子14号 # 太白拿到包子包子7号并把它吃掉 # 我做了包子15号 # alex拿到包子包子8号并把它吃掉 # 我做了包子16号 # 太白拿到包子包子9号并把它吃掉 # 我做了包子17号 # alex拿到包子包子10号并把它吃掉 # 我做了包子18号 # 太白拿到包子包子11号并把它吃掉 # 我做了包子19号 # alex拿到包子包子12号并把它吃掉 # 太白拿到包子包子13号并把它吃掉 # alex拿到包子包子14号并把它吃掉 # 太白拿到包子包子15号并把它吃掉 # alex拿到包子包子16号并把它吃掉 # 太白拿到包子包子17号并把它吃掉 # alex拿到包子包子18号并把它吃掉 # 太白拿到包子包子19号并把它吃掉 # alex拿到包子None号并把它吃掉 # 太白拿到包子None号并把它吃掉 # 注意一开始为什么要生产8个包子才开始吃? # 因为是本来这个队列q = Queue(5)限定了只能放6个包子进去 # 但是生产者一旦把6个包子放进去,两个消费者就分别从里面取出一个包子 # 只不过,重点是消费者要2s后才能继续拿包子吃,但他们已经把两个包子拿出来了 # 所以生产者还得生产2个包子放进去,把队列排满 # 运行发现,中间之所以吃了一个才会生产一个,是因为队列已经满了
# 前面所有的示例, 都是主进程永远不会结束 # 原因是:生产者p在生产完后就结束了 # 但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步 # 解决办法: 主进程里发结束信号,但主进程需要等生产者结束后再发送该信号 from multiprocessing import Queue, Process import time def consumer(name, q): while 1: food = q.get() if not food: break time.sleep(2) print("%s拿到了%s号并开始吃它" % (name, food)) def producer(q, food_name): for i in range(20): # 生产20个包子 time.sleep(0.1) food = "%s%s" % (food_name, i) print("我做了%s号" % food) q.put(food) # 如果队列满了就在这里阻塞,直到队列里有空位置 if __name__ == "__main__": q = Queue(5) # 这个队列的容量,这里是指 6个的容量 p = Process(target=consumer, args=("alex", q)) # 消费者1号 p1 = Process(target=consumer, args=("太白", q)) # 消费者2号 p2 = Process(target=producer, args=(q, "包子")) # 生产者1号 p.start() p2.start() p1.start() p2.join() # 确认生产者把所需生产数量都生产完毕 q.put(None) q.put(None) # 运行结果和上面一样,不同的是程序会结束 # 吃的人是消费数据的 # 制造吃的人是生产数据的 # 生产快,消费慢,供过于求 # 增加消费者 # 生产慢,消费快,供不应求 # 增加生产者
# 控制内存 import queue q = queue.Queue(5) # 设置一个容量值,保护内存 q.put(1) print("1 over") q.put(2) print("2 over") q.put(3) print("3 over") q.put(4) print("4 over") q.put(5) print("5 over") q.put(6) print("6 over") # 1 over # 2 over # 3 over # 4 over # 5 over # 运行发现程序不会停,因为它还在等,等设置的内存里减少东西再执行q.put(6)
标签:one 这一 队列 star ipc start col nbsp 结束
原文地址:https://www.cnblogs.com/shawnhuang/p/10323718.html