标签:sleep bin 注意 last imp 服务 分享图片 cpu核数 recent
之前用的multiprocessing.Process和threading.Thread都是一个线程只能执行一个任务,如果想用一个线程执行多个任务,该怎么办呢?
from threading import Thread def func(): print(‘执行一个线程‘) if __name__ == ‘__main__‘: t = Thread(target=func) t.start() t.start() >> 执行一个线程 Traceback (most recent call last): File "H:/exercise/并发/进程池与线程池/demo.py", line 9, in <module> t.start() File "C:\python36\lib\threading.py", line 842, in start raise RuntimeError("threads can only be started once") RuntimeError: threads can only be started once
可重复利用的线程
from threading import Thread import queue import time class MyThread(Thread): def __init__(self): super().__init__() self.queue = queue.Queue() #实例一个队列 self.daemon = True # 主线程结束,子线程也当结束 self.start() # 实例的时候就启动线程 def run(self): # 不断获取并执行任务 while True: func,args,kwargs = self.queue.get() # 等待获取任务,没有任务就阻塞 try: func(*args,**kwargs) # 执行任务 finally: self.queue.task_done() # 告诉queue 这次任务执行完毕 队列计数器会减1 def apply_async(self,func,args=(),kwargs={}): self.queue.put((func,args,kwargs)) # 把任务添加到队列中 队列计数器会加1 def join(self): self.queue.join() # 若队列还有任务,则会阻塞,若队列没有任务了,不会阻塞 队列根据计数器判断是否还有任务 def func1(): time.sleep(2) print(‘任务1‘) def func2(): time.sleep(2) print(‘任务2‘) def func3(): time.sleep(2) print(‘任务3‘) if __name__ == ‘__main__‘: thread = MyThread() thread.apply_async(func1) # 添加任务到线程队列 thread.apply_async(func2) thread.apply_async(func3) thread.join() # 如果没有这个 会因为主线程结束,子线程不执行,有了这个,当队列任务没执行完之前,将阻塞再这里 print(‘执行完毕!‘) >> 任务1 任务2 任务3 执行完毕!
线程池
线程池的简单实现
方法一:
import threading import queue import time class MyThreadPool: def __init__(self,n): self.queue = queue.Queue() for i in range(n): threading.Thread(target=self.worker,daemon=True).start() def worker(self): #不断获取并执行任务 while True: func,args,kwargs = self.queue.get() # 获取任务 没有任务就阻塞 func(*args,**kwargs) #执行任务 self.queue.task_done() #每次队列加元素,计数器+1,每次task_done() 计数器-1,如果计数器为0,认为队列任务结束 def apply_async(self,func,args=(),kwargs={}): # 把任务放到队列 self.queue.put((func,args,kwargs)) def join(self): self.queue.join() #queue.join() 队列里有元素,会阻塞,直到队列没有元素,才会不阻塞 def func1(): time.sleep(2) print(‘任务1‘) def func2(): time.sleep(2) print(‘任务2‘) def func3(): time.sleep(2) print(‘任务3‘) def func4(name): time.sleep(2) print(name) if __name__ == ‘__main__‘: pool = MyThreadPool(3) pool.apply_async(func1) pool.apply_async(func2) pool.apply_async(func3) pool.apply_async(func4,args=(‘Jack‘,)) pool.join() # 阻塞直至所有任务执行完,如果没有,会因为主线程结束而不执行子线程 >> 两秒后打印: 任务3 任务1 任务2 再过两秒打印: Jack
方法二:面向对象的方式
import threading import queue import time class MyThread(threading.Thread): def __init__(self,queue): super().__init__() self.queue = queue self.daemon = True # 主线程结束,子线程也应该结束 self.start() def run(self): # 不断获取并执行任务 while True: func,args,kwargs = self.queue.get() # 获取任务 没有任务就阻塞 func(*args,**kwargs) # 执行任务 self.queue.task_done() # 每次队列加元素,计数器+1,每次task_done() 计数器-1,如果计数器为0,认为队列任务结束 class Threadpool(): def __init__(self,n): self.queue = queue.Queue() for i in range(n): MyThread(self.queue) # 开启n个线程,队列等待接受任务 def apply_async(self,func,args=(),kwargs={}): # 把任务放到队列 self.queue.put((func,args,kwargs)) def join(self): self.queue.join() #queue.join() 队列里有元素,会阻塞,直到队列没有元素,才会不阻塞 def func1(): time.sleep(2) print(‘任务1‘) def func2(): time.sleep(2) print(‘任务2‘) def func3(): time.sleep(2) print(‘任务3‘) if __name__ == ‘__main__‘: pool = Threadpool(2) pool.apply_async(func1) pool.apply_async(func2) pool.apply_async(func3) pool.join() # 阻塞直至所有任务执行完,如果没有join,会因为主线程结束而不执行子线程 print(‘任务执行完毕‘) >> 2s后打印 任务2 任务1 再过2s打印 任务3 任务执行完毕
注意:线程是由解释器调度的,我们无法控制线程的执行顺序。
python自带的线程池
使用进程池
from multiprocessing import Pool #进程池 import time def func(n): time.sleep(2) print(n) if __name__ == ‘__main__‘: pool = Pool(4) # 实例进程池,不传参数默认是CPU核数 for i in range(10): pool.apply_async(func,args=(i,)) #把任务提交到队列 pool.close() #关闭进程池,不让再提交任务 pool.join() #等待队列任务都完成,规定在join()之前先要close
使用线程池
from multiprocessing.pool import ThreadPool import time def func(n): time.sleep(2) print(n) if __name__ == ‘__main__‘: pool = ThreadPool(4) # 实例线程池,不传参数默认是CPU核数 for i in range(10): pool.apply_async(func,args=(i,)) #把任务提交到队列 pool.close() #关闭线程池,不让再提交任务 pool.join() #等待队列任务都完成,规定在join()之前先要close
两个库的api基本一致,不过执行起来不一样,进程对计算密集型的操作比较拿手,可以调度多个CPU执行。线程是轻量级的进程,但是只是一个CPU执行,更适用于IO密集型操作。
使用线程池来实现并发服务器
import socket from multiprocessing.pool import ThreadPool def worker(conn,addr): while True: data = conn.recv(1024) if data: print(data.decode()) conn.send(data) else: conn.close() print(‘{}已关闭‘.format(addr)) break if __name__ == ‘__main__‘: pool = ThreadPool() sock = socket.socket() sock.bind((‘‘,9999)) sock.listen(5) print(‘开始监听!‘) while True: conn,addr = sock.accept() print(‘{}已连接‘.format(addr)) pool.apply_async(worker,args=(conn,addr))
标签:sleep bin 注意 last imp 服务 分享图片 cpu核数 recent
原文地址:https://www.cnblogs.com/woaixuexi9999/p/9332880.html