标签:
import sys import threading import Queue import traceback # 定义一些Exception,用于自定义异常处理 class NoResultsPending(Exception): """All works requests have been processed""" pass class NoWorkersAvailable(Exception): """No worket threads available to process remaining requests.""" pass def _handle_thread_exception(request, exc_info): """默认的异常处理函数,只是简单的打印""" traceback.print_exception(*exc_info) #classes class WorkerThread(threading.Thread): """后台线程,真正的工作线程,从请求队列(requestQueue)中获取work, 并将执行后的结果添加到结果队列(resultQueue)""" def __init__(self,requestQueue,resultQueue,poll_timeout=5,**kwds): threading.Thread.__init__(self,**kwds) ‘‘‘设置为守护进行‘‘‘ self.setDaemon(True) self._requestQueue = requestQueue self._resultQueue = resultQueue self._poll_timeout = poll_timeout ‘‘‘设置一个flag信号,用来表示该线程是否还被dismiss,默认为false‘‘‘ self._dismissed = threading.Event() self.start() def run(self): ‘‘‘每个线程尽可能多的执行work,所以采用loop, 只要线程可用,并且requestQueue有work未完成,则一直loop‘‘‘ while True: if self._dismissed.is_set(): break try: ‘‘‘ Queue.Queue队列设置了线程同步策略,并且可以设置timeout。 一直block,直到requestQueue有值,或者超时 ‘‘‘ request = self._requestQueue.get(True,self._poll_timeout) except Queue.Empty: continue else: ‘‘‘之所以在这里再次判断dimissed,是因为之前的timeout时间里,很有可能,该线程被dismiss掉了‘‘‘ if self._dismissed.is_set(): self._requestQueue.put(request) break try: ‘‘‘执行callable,讲请求和结果以tuple的方式放入requestQueue‘‘‘ result = request.callable(*request.args,**request.kwds) print self.getName() self._resultQueue.put((request,result)) except: ‘‘‘异常处理‘‘‘ request.exception = True self._resultQueue.put((request,sys.exc_info())) def dismiss(self): ‘‘‘设置一个标志,表示完成当前work之后,退出‘‘‘ self._dismissed.set() class WorkRequest: ‘‘‘ @param callable_:,可定制的,执行work的函数 @param args: 列表参数 @param kwds: 字典参数 @param requestID: id @param callback: 可定制的,处理resultQueue队列元素的函数 @param exc_callback:可定制的,处理异常的函数 ‘‘‘ def __init__(self,callable_,args=None,kwds=None,requestID=None, callback=None,exc_callback=_handle_thread_exception): if requestID == None: self.requestID = id(self) else: try: self.requestID = hash(requestID) except TypeError: raise TypeError("requestId must be hashable") self.exception = False self.callback = callback self.exc_callback = exc_callback self.callable = callable_ self.args = args or [] self.kwds = kwds or {} def __str__(self): return "WorkRequest id=%s args=%r kwargs=%r exception=%s" % (self.requestID,self.args,self.kwds,self.exception) class ThreadPool: ‘‘‘ @param num_workers:初始化的线程数量 @param q_size,resq_size: requestQueue和result队列的初始大小 @param poll_timeout: 设置工作线程WorkerThread的timeout,也就是等待requestQueue的timeout ‘‘‘ def __init__(self,num_workers,q_size=0,resq_size=0,poll_timeout=5): self._requestQueue = Queue.Queue(q_size) self._resultQueue = Queue.Queue(resq_size) self.workers = [] self.dismissedWorkers = [] self.workRequests = {} #设置个字典,方便使用 self.createWorkers(num_workers,poll_timeout) def createWorkers(self,num_workers,poll_timeout=5): ‘‘‘创建num_workers个WorkThread,默认timeout为5‘‘‘ for i in range(num_workers): self.workers.append(WorkerThread(self._requestQueue,self._resultQueue,poll_timeout=poll_timeout)) def dismissWorkers(self,num_workers,do_join=False): ‘‘‘停用num_workers数量的线程,并加入dismiss_list‘‘‘ dismiss_list = [] for i in range(min(num_workers,len(self.workers))): worker = self.workers.pop() worker.dismiss() dismiss_list.append(worker) if do_join : for worker in dismiss_list: worker.join() else: self.dismissedWorkers.extend(dismiss_list) def joinAllDismissedWorkers(self): ‘‘‘join 所有停用的thread‘‘‘ #print len(self.dismissedWorkers) for worker in self.dismissedWorkers: worker.join() self.dismissedWorkers = [] def putRequest(self,request ,block=True,timeout=None): assert isinstance(request,WorkRequest) assert not getattr(request,‘exception‘,None) ‘‘‘当queue满了,也就是容量达到了前面设定的q_size,它将一直阻塞,直到有空余位置,或是timeout‘‘‘ self._requestQueue.put(request, block, timeout) self.workRequests[request.requestID] = request def poll(self,block = False): while True: if not self.workRequests: raise NoResultsPending elif block and not self.workers: raise NoWorkersAvailable try: ‘‘‘默认只要resultQueue有值,则取出,否则一直block‘‘‘ request , result = self._resultQueue.get(block=block) if request.exception and request.exc_callback: request.exc_callback(request,result) if request.callback and not (request.exception and request.exc_callback): request.callback(request,result) del self.workRequests[request.requestID] except Queue.Empty: break def wait(self): while True: try: self.poll(True) except NoResultsPending: break def workersize(self): return len(self.workers) def stop(self): ‘‘‘join 所有的thread,确保所有的线程都执行完毕‘‘‘ self.dismissWorkers(self.workersize(),True) self.joinAllDismissedWorkers() if __name__==‘__main__‘: import random import time import datetime def do_work(data): time.sleep(random.randint(1,3)) res = str(datetime.datetime.now()) + "" +str(data) return res def print_result(request,result): print "---Result from request %s : %r" % (request.requestID,result) main = ThreadPool(3) for i in range(40): req = WorkRequest(do_work,args=[i],kwds={},callback=print_result) main.putRequest(req) print "work request #%s added." % req.requestID print ‘-‘*20, main.workersize(),‘-‘*20 counter = 0 while True: try: time.sleep(0.5) main.poll() if(counter==5): print "Add 3 more workers threads" main.createWorkers(3) print ‘-‘*20, main.workersize(),‘-‘*20 if(counter==10): print "dismiss 2 workers threads" main.dismissWorkers(2) print ‘-‘*20, main.workersize(),‘-‘*20 counter+=1 except NoResultsPending: print "no pending results" break main.stop() print "Stop"
import queue import threading import contextlib import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) # 初始化队列 else: self.q = queue.Queue() # 初始化队列 self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] # 定义列表,存储当前已经生成的线程 self.free_list = [] # 定义列表,存储当前空闲线程 def run(self, func, args, callback=None): """ 线程池执行一个任务 :param func: 任务函数 :param args: 任务函数所需参数 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) :return: 如果线程池已经终止,则返回True否则None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: # 如果当前线程数未达到上限,且无空闲线程,则创建线程 self.generate_thread() w = (func, args, callback,) # 生成任务元组 self.q.put(w) # 将任务放到队列中 def generate_thread(self): """ 创建一个线程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循环去获取任务函数并执行任务函数 """ current_thread = threading.currentThread # 获取当前线程 self.generate_list.append(current_thread) # 将获取到的线程加入到列表中 event = self.q.get() # 从队列中获取任务元组 while event != StopEvent: # 如果获取任务元组成功 func, arguments, callback = event # 拆解任务元组 try: result = func(*arguments) # 执行任务 success = True except Exception as e: # 检测到任务执行异常,重置任务状态标识 success = False result = None if callback is not None: # 如果callback函数已经定义,那么执行callback函数 try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) # 如果获取任务失败,说明任务已经全部执行完毕,则移除线程 def close(self): """ 执行完所有的任务后,所有线程停止 """ self.cancel = True full_size = len(self.generate_list) while full_size: # 拒绝再加入新的任务,等待当前运行任务执行完毕后亭子 self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 无论是否还有任务,终止线程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager # 将一个生成器函数转换成上下文管理器 def worker_state(self, state_list, worker_thread): """ 用于记录线程中正在等待的线程数 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result): # status, execute action status # result, execute action return value pass def action(i): print(i) for i in range(30): ret = pool.run(action, (i,), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) print(len(pool.generate_list), len(pool.free_list)) # pool.close() # pool.terminate()
标签:
原文地址:http://www.cnblogs.com/jishuweiwang/p/5697610.html