标签:资源 turn 通信 等于 worker lock pen 分享 python
1.线程队列
线程队列有三种:先进先出,后进先出,按优先级进出,具体如下:
1 import queue 2 3 # 先进先出 4 q = queue.Queue(3) 5 6 q.put(1) 7 q.put(2) 8 q.put(3) 9 # q.put(4) # 再放阻塞,等待队列消费 10 # q.put(4,block = False) # 不阻塞,强制放数据,如果满的情况下直接报错 等价与 q.put_nowait(4) 11 # q.put(4,block = True) # 阻塞,等待放数据,如果满的情况下阻塞,默认是True 12 # q.put(4, block=True, timeout=3) # 阻塞等待3秒,3秒还在阻塞,强制放数据,满的情况下报错 13 print(q.full()) 14 print(q.empty()) 15 16 print(q.get()) 17 print(q.get()) 18 print(q.get()) 19 # print(q.get()) # 再拿阻塞,等待队列新增数据 block timeout同put 20 print(q.full()) 21 print(q.empty()) 22 23 24 # 后进先出 同堆栈原理 25 q = queue.LifoQueue(3) 26 27 q.put(1) 28 q.put(2) 29 q.put(3) 30 # q.put(4) # 再放阻塞,等待队列消费 31 # q.put(4,block = False) # 不阻塞,强制放数据,如果满的情况下直接报错 等价与 q.put_nowait(4) 32 # q.put(4,block = True) # 阻塞,等待放数据,如果满的情况下阻塞,默认是True 33 # q.put(4, block=True, timeout=3) # 阻塞等待3秒,3秒还在阻塞,强制放数据,满的情况下报错 34 print(q.full()) 35 print(q.empty()) 36 37 print(q.get()) 38 print(q.get()) 39 print(q.get()) 40 # print(q.get()) # 再拿阻塞,等待队列新增数据 block timeout同put 41 print(q.full()) 42 print(q.empty()) 43 44 # 优先级进出 优先级越小的先出 45 q = queue.PriorityQueue(3) 46 47 q.put([50, 1]) 48 q.put([20, 2]) 49 q.put([30, 3]) 50 # q.put([50, 4]) # 再放阻塞,等待队列消费 51 print(q.full()) 52 print(q.empty()) 53 54 print(q.get()) 55 print(q.get()) 56 print(q.get()) 57 # print(q.get()) # 再拿阻塞,等待队列新增数据 block timeout同put 58 print(q.full()) 59 print(q.empty())
2.进程池&线程池
在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信。
然而这种实现方式的致命缺陷是:服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪。
于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途。
例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制。
2.1基本用法:
1、submit(fn, *args, **kwargs)
异步提交任务
2、map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
3、shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
4、result(timeout=None)
取得结果
5、add_done_callback(fn)
回调函数
1 from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor 2 from threading import current_thread 3 import time, random, os 4 5 6 def sayhi(name): 7 print("%s say hi... pid:%s; current_thread:%s" % (name, os.getpid(), current_thread().getName())) 8 time.sleep(random.randint(1, 3)) 9 print("%s say bye... pid:%s; current_thread:%s" % (name, os.getpid(), current_thread().getName())) 10 11 12 if __name__ == "__main__": 13 # pool = ProcessPoolExecutor(3) # 实例化进程池,指定最大进程数为3 14 pool = ThreadPoolExecutor(3) # 实例化线程池,指定最大线程数为3 15 for i in range(10): 16 pool.submit(sayhi, "xg%s" % i,) 17 # 关闭pool的submit功能,不可以再丢进程或线程进线程池。 18 pool.shutdown(wait=True) # 此刻统计当前pool里的所有进程或线程数,每运行完一个-1,直到等于0时,往下运行代码。等同于进程线程的join 19 print("all over!")
2.2同步回调 开启的多线程变成了串行,拿到第一个线程的执行结果才继续往下继续运行
1 # 钓鱼大赛,参赛者钓鱼,然后称重。 2 from concurrent.futures import ThreadPoolExecutor 3 import time, random, os 4 5 6 def fishing(name): 7 print("%s is fishing..." % name) 8 time.sleep(random.randint(2, 5)) 9 fish = random.randint(5, 15) * "m" 10 res = {"name": name, "fish": fish} 11 return res 12 13 14 def weigh(res): 15 name = res["name"] 16 size = len(res["fish"]) 17 print("%s 钓到的鱼大小为 %s kg" % (name, size)) 18 19 20 if __name__ == "__main__": 21 pool = ThreadPoolExecutor(3) 22 res1 = pool.submit(fishing, "xt").result() # 同步拿结果,拿到结果才继续往下走 23 weigh(res1) 24 res2 = pool.submit(fishing, "dj").result() 25 weigh(res2) 26 res3 = pool.submit(fishing, "hh").result() 27 weigh(res3)
2.3异步回调
1 from concurrent.futures import ThreadPoolExecutor 2 import time, random, os 3 4 5 def fishing(name): 6 print("%s is fishing..." % name) 7 time.sleep(random.randint(2, 5)) 8 fish = random.randint(5, 15) * "m" 9 res = {"name": name, "fish": fish} 10 return res 11 12 13 def weigh(pool_obj): 14 res = pool_obj.result() # 拿到线程对象的运行结果,因为是线程运行完才会调用weigh,所以马上能拿到结果 15 name = res["name"] 16 size = len(res["fish"]) 17 print("%s 钓到的鱼大小为 %s kg" % (name, size)) 18 19 20 if __name__ == "__main__": 21 pool = ThreadPoolExecutor(3) 22 pool.submit(fishing, "xt").add_done_callback(weigh) # 当线程执行完后,将线程对象当参数传给weigh 23 pool.submit(fishing, "dj").add_done_callback(weigh) 24 pool.submit(fishing, "hh").add_done_callback(weigh)
2.4map用法
1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor 2 3 import os,time,random 4 def task(n): 5 print(‘%s is runing‘ %os.getpid()) 6 time.sleep(random.randint(1,3)) 7 return n**2 8 9 if __name__ == ‘__main__‘: 10 11 executor=ThreadPoolExecutor(max_workers=3) 12 13 # for i in range(11): 14 # future=executor.submit(task,i) 15 16 executor.map(task,range(1,12)) #map取代了for+submit
标签:资源 turn 通信 等于 worker lock pen 分享 python
原文地址:https://www.cnblogs.com/znyyy/p/10175255.html