标签:消费者 oba 程序 mutex queue 消费 告诉 传输 name
一、进程间通信
利用队列(管道+锁)实现进程间通信:IPC机制
from multiprocessing import Queue q = Queue(5) # 产生一个最多能存放五个数据的队列 q.put(1) # 往队列中存放数据 q.put(2) q.put(3) q.put(4) q.put(5) q.put(6) # 存放的数据个数大于队列最大存储个数,程序会阻塞 print(q.full()) # 判断队列是否存满 # 存取数据 for i in range(5): q.put(i) print(q.get()) # get一次就取一次 print(q.get()) # get一次就取一次 print(q.get()) # get一次就取一次 print(q.get_nowait()) # 如果队列不为空,相当于get取值,否则会报错 print(q.empty()) # 判断队列是否为空,注意:在并发的情况下,这个方法判断不正确
二、基于队列实现进程间通信
from multiprocessing import Process,Queue def producer(q): q.put(‘hello world‘) def consumer(q): print(q.get()) if __name__ == ‘__main__‘: q = Queue() # 生成一个队列对象 p1 = Process(target=producer,args=(q,)) p2 = Process(target=consumer,args=(q,)) p1.start() p2.start()
三、生成者消费者模型
生产者:生产数据的
消费者:处理数据的
作用:解决供需不平衡的问题
定义一个队列,用来存放固定数量的数据
解决一个生产者和消费者不需要直接打交道,两者都通过队列实现数据传输
def consumer(name,q): while True: data = q.get() time.sleep(random.randint(1,3)) print(‘%s吃了%s‘%(name,data)) q.task_done() # 告诉队列,已经将数据取出来并处理完毕 if __name__ == ‘__main__‘: q = JoinableQueue() p1 = Process(target=producer,args=(‘egon‘,‘包子‘,q)) p2 = Process(target=producer,args=(‘tank‘,‘生蚝‘,q)) c1 = Process(target=consumer,args=(‘owen‘,q)) c2 = Process(target=consumer,args=(‘kevin‘,q)) p1.start() p2.start() c1.daemon = True #守护进程,主进程结束,必须结束 c2.daemon = True c1.start() c2.start() # 等待消费者生产完所有数据 p1.join() p2.join() # 等待队列中数据全部取出 q.join() print(‘主‘) #队列中没有数据了,主进程结束,这时候c1,c2必须结束
四、线程
什么是线程:
进程是资源单位
线程是执行单位
注意:每一个进程都会自带一个线程
为什么要有线程:
开一个进程:
申请内存空间 耗时
将代码拷贝到新的内存空间中 耗时
开一个线程:
不需要申请内存空间
开线程的开销远远小于开进程的开销
五、开启进程的两种方式
1、导入模块
from threading import Thread import time def task(name): print(‘%s is running‘%name) time.sleep(1) print(‘%s is over‘%name) if __name__ == ‘__main__‘: t = Thread(target=task,args=(‘egon‘,)) t.start() # 开启线程的速度非常快 print(‘主‘)
2、自定义类
from threading import Thread import time class MyThread(Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print(‘%s is running‘%self.name) time.sleep(1) print(‘%s is over‘%self.name) if __name__ == ‘__main__‘: t = MyThread(‘egon‘) t.start() print(‘主‘)
六、线程之间数据共享
from threading import Thread x = 100 def task(): global x x = 666 if __name__ == ‘__main__‘: t = Thread(target=task) t.start() t.join() print(x) # 666
七、线程互斥锁
保护数据安全性
from threading import Thread,Lock import time mutex = Lock() n = 100 def task(): global n mutex.acquire() # 抢锁 temp = n time.sleep(0.1) n = temp - 1 mutex.release() # 释放锁 t_list = [] for i in range(100): t = Thread(target=task) t.start() t_list.append(t) for t in t_list: t.join() print(n) # 0
八、线程对象的其他属性和方法
from threading import Thread,active_count,current_thread import os import time def task(name): print(‘%s is running‘%name,os.getpid()) print(‘%s is running‘%name,current_thread().name) #Thread-1 time.sleep(1) print(‘%s is over‘%name) def info(name): print(‘%s is running‘ % name, current_thread().name) # Thread-2 time.sleep(1) print(‘%s is over‘ % name) t = Thread(target=task,args=(‘egon‘,)) t1 = Thread(target=info,args=(‘lucas‘,)) t.start() t1.start() # t.join() # t1.join() print(active_count()) # 当前存活的线程数(主线程自带) print(os.getpid()) print(current_thread().name) # MainThread
九、守护线程
from threading import Thread import time def task(name): print(‘%s is running‘%name) time.sleep(1) print(‘%s is over‘%name) if __name__ == ‘__main__‘: t = Thread(target=task,args=(‘lucas‘,)) t.daemon = True t.start() print(‘主‘)
标签:消费者 oba 程序 mutex queue 消费 告诉 传输 name
原文地址:https://www.cnblogs.com/yanminggang/p/10826476.html