标签:生产者消费者模型 指定 port 死锁 decode 队列 导致 经历 threading
对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
同步:当进程执行IO(等待外部数据)的时候,-----等。同步(例如打电话的时候必须等)
异步:当进程执行IO(等待外部数据)的时候,-----不等,去执行其他任务,一直等到数据接收成功,再回来处理。异步(例如发短信)
当我们去爬取一个网页的时候,要爬取多个网站,有些人可能会发起多个请求,然后通过函数顺序调用。执行顺序也是先调用先执行。效率非常低。
同步还是异步,这需要我们根据需求来决定
lock、GIL 就是为了达到同步,来保证安全.
而异步在多进程/线程IO的时候能提高效率.
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:
例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数...
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
实例:
from multiprocessing import Pool # 进程池类 import os,time def work(n): print(‘%s run‘ %os.getpid()) time.sleep(3) return n**2 if __name__ == ‘__main__‘: p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限 res_l.append(res) print(res_l)
from multiprocessing import Pool import os,time def work(n): print(‘%s run‘ %os.getpid()) time.sleep(3) return n**2 if __name__ == ‘__main__‘: p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res res_l.append(res) #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close() p.join() for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
死锁与递归锁(可重入锁)
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都
正在使用,所有这两个线程在无外力作用下将一直等待下去。更直观的死锁比如,有两个lock对象,同一资源分别被两个进程的lock.requare阻塞,就造成程序永久的阻塞
解决死锁就可以用递归锁
实例:
import threading,time # lock_A = threading.Lock() # lock_B = threading.Lock() r_lock = threading.RLock() class Mythread(threading.Thread): def actionA(self): r_lock.acquire() print(self.name,time.ctime()) time.sleep(2) r_lock.acquire() print(self.name,time.ctime()) time.sleep(1) r_lock.release() r_lock.release() def actionB(self): r_lock.acquire() print(self.name,time.ctime()) time.sleep(2) r_lock.acquire() print(self.name,time.ctime()) time.sleep(1) r_lock.release() r_lock.release() def run(self): self.actionA() self.actionB() li = [] for i in range(5): t = Mythread() t.start() li.append(t) for t in li: t.join() print("ending")
为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:
threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
信号量
BoundedSemaphore或Semaphore
实际上也是一种互斥(也就是锁),该锁用于限制线程的并发量
也就是说:信号量用来控制线程并发数的,
信号量管理一个内置的计数 器,每当调用acquire()时-1,调用release()时+1。
计数器不能小于0,当计数器为 0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。(类似于停车位的概念)
BoundedSemaphore与Semaphore的唯一区别在于前者将在调用release()时检查计数 器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。
import threading,time class myThread(threading.Thread): def run(self): #启动后,执行run方法 if semaphore.acquire(): #加把锁,可以放进去多个(相当于5把锁,5个钥匙,同时有5个线程) print(self.name) time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) #同时能有几个线程进去(设置为5就是一次5个线程进去),类似于停车厂一次能停几辆车 thrs=[] #空列表 for i in range(100): #100个线程 thrs.append(myThread()) #加线程对象 for t in thrs: t.start() #分别启动
Event对象实现了简单的线程通信机制,它提供了设置信号,清除信号,等待等方法用于实现线程间的通信。
1 设置信号
使用Event的set()方法可以设置Event对象内部的信号标志为真。Event对象提供了isSet()方法来判断其内部信号标志的状态。当使用event对象的set()方法后,isSet()方法返回真
2 清除信号
使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假,当使用Event的clear方法后,isSet()方法返回假
3 等待
Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。
import threading, time class Boss(threading.Thread): def run(self): print("BOSS:今晚大家都要加班到22:00。") print(event.isSet()) event.set() time.sleep(5) print("BOSS:<22:00>可以下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__ == "__main__": event = threading.Event() threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() 同步条件Event
import time from threading import Thread, Event e = Event() def start(): print("began start!") time.sleep(2) print("server start!") e.set() def conn(): while True: print("try connect...") e.wait(timeout=1) # 在这里阻塞 等1s 之后还没有isSet() 就退出 if e.isSet(): print("connect successful!") break else: print("connect filed!") Thread(target=start).start() Thread(target=conn).start()
生产者消费者模型
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
实例:
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res=‘包子%s‘ %i q.put(res) print(‘\033[44m%s 生产了 %s\033[0m‘ %(os.getpid(),res)) if __name__ == ‘__main__‘: q=Queue() #生产者们:即厨师们 p1=Process(target=producer,args=(q,)) #消费者们:即吃货们 c1=Process(target=consumer,args=(q,)) #开始 p1.start() c1.start() print(‘主进程结束‘)
在并发编程中,只要涉及到生产数据和处理数据,使用这个模式能够改善生产与消费时快时慢,不同步而导致的效率低下,并且能实现并发!
回调函数
进程池中的生产者生产完了,就立即告知主进程:我好了额,可以处理我的结果了。主进程则调用一个消费者函数去处理该结果,该函数即回调函数
这样主进程在调用消费者时直接拿到的是生产者的结果,而不需要借助队列来传递,某些时候就可以减少阻塞.
import requests import re,os from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def crawler(url): print("%s正在请求%s"%(os.getppid(),url)) response = requests.get(url) print("%s请求%s成功!"%(os.getppid(),url)) return response def handle(obj): response = obj.result() htm = response.content.decode("utf8") # 从报头中获取 #没解码时 会看到 charset=utf-8 print(re.findall("href=(.*?com)",htm)) print("%s解析完成!"%os.getppid()) if __name__ == ‘__main__‘: urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.jd.com", "https://sale.vmall.com", "http://uland.taobao.com"] pool = ProcessPoolExecutor(3) for i in urls: obj = pool.submit(crawler,i) obj.add_done_callback(handle) # 将结果的一个对象给handle函数
import requests import re,os from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor # def crawler(url): print("%s正在请求%s"%(os.getppid(),url)) response = requests.get(url) print("%s请求%s成功!"%(os.getppid(),url)) return response def handle(response): htm = response.content.decode("utf8") # 从报头中获取 #没解码时 会看到 charset=utf-8 print(re.findall("href=(.*?com)",htm)) print("解析完成!") if __name__ == ‘__main__‘: urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.jd.com", "https://sale.vmall.com", "http://uland.taobao.com"] pool = ProcessPoolExecutor(3) objs = [] for i in urls: obj = pool.submit(crawler,i) objs.append(obj) pool.shutdown() # ??? for i in objs: res = i.result() # 这是个阻塞函数 会阻塞到有结果为止 也就是说 crawler是串行的 handle(res)
标签:生产者消费者模型 指定 port 死锁 decode 队列 导致 经历 threading
原文地址:https://www.cnblogs.com/guyanzhi/p/10222046.html