标签:模块 处理 time clear 针对 free try append 创建
简单线程池:
#!/usr/bin/env python# Version = 3.5.2# __auth__ = ‘无名小妖‘import queueimport threadingimport timeclass ThreadPool:"""简易线程池类,缺陷:1.线程无法重用2.初始线程最大化,可能导致浪费"""def __init__(self, maxsize=5):"""通过队列实现线程池:param maxsize: 线程池最大值"""self.maxsize = maxsizeself._q = queue.Queue(maxsize)# 将线程类循环放入队列for i in range(maxsize):self._q.put(threading.Thread)def get_thread(self):"""从队列中取出线程类:return: 线程类"""return self._q.get()def add_thread(self):"""放入类:return: none"""self._q.put(threading.Thread)# 实例化线程池pool = ThreadPool()# 测试函数def task(a, p):print(a)time.sleep(1)p.add_thread()for i in range(100):t = pool.get_thread()t_obj = t(target=task, args=(i, pool))t_obj.start()
#!/usr/bin/env python# Version = 3.5.2# __auth__ = ‘无名小妖‘import queueimport threadingimport contextlibimport time# 静态字段,线程停止标记StopEvent = object()class ThreadPool(object):def __init__(self, max_num, max_task_num = None):""":param max_num: 线程池大小:param max_task_num: 任务数量,默认无限"""if max_task_num:self.q = queue.Queue(max_task_num)else:self.q = queue.Queue()self.max_num = max_numself.cancel = Falseself.terminal = Falseself.generate_list = [] # 当前创建的线程self.free_list = [] # 当前空闲的线程def run(self, func, args, callback=None):"""线程池执行一个任务:param func: 任务函数:param args: 任务函数所需参数:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数):return: 如果线程池已经终止,则返回True否则None"""if self.cancel:return# 判断 如果没有空闲线程 并且 已经创建的线程小于最大线程数,创建新线程if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:self.generate_thread()# 把任务放队列w = (func, args, callback,)self.q.put(w)def generate_thread(self):"""创建一个线程,并执行call方法"""t = threading.Thread(target=self.call)t.start()def call(self):"""循环去获取任务函数并执行任务函数"""current_thread = threading.currentThread()self.generate_list.append(current_thread)event = self.q.get() # 取任务while event != StopEvent: # 循环执行任务func, arguments, callback = eventtry:result = func(*arguments) # 任务函数success = Trueexcept Exception as e:success = Falseresult = Noneif callback is not None:try:callback(success, result) # 回调函数except Exception as e:pass# 执行完任务,将线程置为空闲with self.worker_state(self.free_list, current_thread):if self.terminal:event = StopEventelse:event = self.q.get()else:self.generate_list.remove(current_thread)def close(self):"""执行完所有的任务后,所有线程停止"""self.cancel = Truefull_size = len(self.generate_list)while full_size:self.q.put(StopEvent) # 将终止标识放入队列full_size -= 1def terminate(self):"""无论是否还有任务,终止线程"""self.terminal = Truewhile self.generate_list:self.q.put(StopEvent)self.q.queue.clear()""" 经常使用到的with场景是(打开文件进行文件处理,然后隐式地执行了文件句柄的关闭,同样适合socket之类的,这些类都提供了对with的支持)contextlib是为了加强with语句,提供上下文机制的模块,它是通过Generator实现的。contextlib中的contextmanager作为装饰器来提供一种针对函数级别的上下文管理机制 ."""@contextlib.contextmanagerdef worker_state(self, state_list, worker_thread):"""用于记录线程中正在等待的线程数"""state_list.append(worker_thread)try:yieldfinally:state_list.remove(worker_thread)# 实例化线程池pool = ThreadPool(3)def callback(status, result):# status, execute action status# result, execute action return valueif status == True:print(‘Completed!‘)def action(i):print(i)for i in range(30):ret = pool.run(action, (i,), callback)time.sleep(5)print(len(pool.generate_list), len(pool.free_list))pool.close()# pool.terminate()
标签:模块 处理 time clear 针对 free try append 创建
原文地址:http://www.cnblogs.com/wumingxiaoyao/p/7047857.html