标签:eve 高度 targe start [] span put cal send
一.线程队列
队列:
1.Queue
先进先出
自带锁 数据安全
from queue import Queue
from multiprocessing import Queue (IPC队列)
2.LifoQueue后进先出
后进先出
自带锁 数据安全
from queue import LifoQueue lq=LifoQueue(5) lq.put(123) lq.put(666) lq.put(888) lq.put(999) lq.put("love") print(lq.put_nowait("miss")) #报错 queue.Full print(lq) # <queue.LifoQueue object at 0x0000017901BC8C88> print(lq.get()) #love print(lq.get()) #999 print(lq.get()) #888 print(lq.get()) #666 print(lq.get()) #123 #print(lq.get_nowait()) #报错 queue.Empty
3.PriorityQueue优先级队列
(放元组,数字从小到大,英文字母按ASCII码先后顺序)
from queue import PriorityQueue pq=PriorityQueue(4) pq.put((10,"aaa")) pq.put((5,"S")) pq.put((5,"ccc")) pq.put((10,"zzz")) #pq.put_nowait((10,"bbb")) #报错queue.Full print(pq) # <queue.PriorityQueue object at 0x000001D6FEF38C50> print(pq.get()) print(pq.get()) #(5, ‘ccc‘) print(pq.get()) #(10, ‘aaa‘) print(pq.get()) #(10, ‘zzz‘) print(pq.get()) #(20, ‘bbb‘) # print(pq.get_nowait()) # 报错queue.Empty
二 线程池
Multiprocessing模块 自带进程池Pool
Threading 模块 没有Pool(没有线程池)
concurrent.futures帮助你管理线程池和进程池
高度封装
进程池/线程池的统一的统一的使用方法
import time from threading import currentThread from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor def func(i): time.sleep(1) print("in %s %s"%(i,currentThread())) return i**2 def back(fn): print(fn.result(),currentThread()) t=ThreadPoolExecutor(5) ret_l=[] for i in range(20): ret=t.submit(func,i).add_done_callback(back) # ret_l.append(ret) t.shutdown(wait=True) #括号里可以省略 # for ret in ret_l: # print(ret.result()) print(666)
ThreadPoolExecutor的相关方法:
1.t.map方法 启动多线程任务 # t.map(func,range(20)) 替代for submit
2.t.submit(func,*args,**kwargs) 异步提交任务
3.t.shutdown (wait=True) 相当于进程池的pool.close()+pool.join()操作 同步控制
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
submit和map必须在shutdown之前
4.result获取结果 ret.result()
5.回调函数 add_done_callback(back)
在回调函数内接收的参数是一个对象,需要通过result来获取返回值
在主进程中执行
三.协程
进程:资源分配的最小单位
线程 :CPU调度的最小单位
协程: 能在一条线程的基础上,在多个任务之间互相切换
节省线程开启的消耗
从python代码的级别调度
正常的线程是CPU调度的最小单位
协程的调度并不是由操作系统来完成的.
(一).yield的机制就是协程
def func(): print(1) x=yield "aaa" print(x) yield "bbb" g=func() print(next(g)) print(g.send("***"))
(二).在多个函数之间互相切换的功能--协程
def consumer(): while True: x=yield print(x) def producer(): g=consumer() next(g) for i in range(10): g.send(i) producer()
yeild 只有程序之间的切换,没有重利用任何IO操作的时间
greenlet(第三方模块) 程序上下文切换
cmd : pip3 install 模块名 安装第三方模块
(三).greenlet
协程模块 单纯的程序切换耗费时间
import time from greenlet import greenlet def eat(): print(‘吃‘) time.sleep(1) g2.switch() print("吃完了") time.sleep(1) g2.switch() def play(): print("玩") time.sleep(1) g1.switch() print("玩美了") g1=greenlet(eat) g2=greenlet(play) g1.switch()
(四).gevent
遇到IO就切换 使用协程减少IO操作带来的时间消耗
greenlet 是gevent的底层
gevent是基于greenlet实现的
python代码在控制程序的切换
第一版:
import time import gevent from gevent import monkey def eat(): print("吃") gevent.sleep(2) print("吃完了") def play(): print("玩") gevent.sleep(2) print("玩美了") g1=gevent.spawn(eat) g2=gevent.spawn(play) g1.join() #等待g1结束 g2.join() #等待g2结束
第二版
要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
from gevent import monkey;monkey.patch_all() import time import gevent def eat(name): print("吃") time.sleep(2) print("%s吃完了"%name) def play(): print("玩") time.sleep(2) print("玩美了") g1=gevent.spawn(eat,"alex") #括号里传参第一个是函数名,后面可以跟多个参数可以是位置参数,也可以是关键字参数,都是传给eat的 g2=gevent.spawn(play) gevent.joinall([g1,g2])# g1.join()和g2.join()合并成一个. print(g1.value) #None
四.协程起socket(tcp)
服务器代码
from gevent import monkey;monkey.patch_all() import socket import gevent def talk(conn): while True: conn.send(b‘hallo‘) print(conn.recv(1024)) sk=socket.socket() sk.bind(("127.0.0.1",9902)) sk.listen() while True: conn,addr=sk.accept() gevent.spawn(talk,conn)
客户端代码
import socket from threading import Thread def client(): sk=socket.socket() sk.connect(("127.0.0.1",9902)) while True: print(sk.recv(1024)) sk.send(b‘hi‘) for i in range(5): Thread(target=client).start()
python全栈开发 * 线程队列 线程池 协程 * 180731
标签:eve 高度 targe start [] span put cal send
原文地址:https://www.cnblogs.com/J-7-H-2-F-7/p/9398394.html