标签:
线程池:
版本一:
#!/usr/bin/env python # -*- coding:utf-8 -*- import Queue import threading class ThreadPool(object): def __init__(self, max_num=20): self.queue = Queue.Queue(max_num) for i in xrange(max_num): self.queue.put(threading.Thread) def get_thread(self): return self.queue.get() def add_thread(self): self.queue.put(threading.Thread) """ pool = ThreadPool(10) def func(arg, p): print arg import time time.sleep(2) p.add_thread() for i in xrange(30): thread = pool.get_thread() t = thread(target=func, args=(i, pool)) t.start() """
版本二:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 """ 5 custom ThreadPool 6 7 How to use: 8 9 pool = ThreadPool(1) 10 11 def callback(status, result): 12 # status, execute action status 13 # result, execute action return value 14 15 pass 16 def action(i): 17 pass 18 19 for i in range(20): 20 if pool.stop: 21 pool.terminal() 22 break 23 ret = pool.run(action, (i,), callback) 24 25 print ‘end‘ 26 27 28 """ 29 30 31 import Queue 32 import threading 33 import contextlib 34 35 StopEvent = object() 36 37 38 class ThreadPool(object): 39 40 def __init__(self, max_num): 41 self.q = Queue.Queue(max_num) 42 self.max_num = max_num 43 self.cancel = False 44 self.generate_list = [] 45 self.free_list = [] 46 47 def run(self, func, args, callback=None): 48 """ 49 线程池执行一个任务 50 :param func: 任务函数 51 :param args: 任务函数所需参数 52 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 53 :return: 如果线程池已经终止,则返回True否则None 54 """ 55 if self.cancel: 56 return True 57 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 58 self.generate_thread() 59 w = (func, args, callback,) 60 self.q.put(w) 61 62 def generate_thread(self): 63 """ 64 创建一个线程 65 """ 66 t = threading.Thread(target=self.call) 67 t.start() 68 69 def call(self): 70 """ 71 循环去获取任务函数并执行任务函数 72 """ 73 current_thread = threading.currentThread 74 self.generate_list.append(current_thread) 75 76 event = self.q.get() 77 while event != StopEvent: 78 func, arguments, callback = event 79 try: 80 result = func(*arguments) 81 success = True 82 except Exception, e: 83 success = False 84 result = None 85 86 if callback is not None: 87 try: 88 callback(success, result) 89 except Exception, e: 90 pass 91 92 with self.worker_state(self.free_list, current_thread): 93 event = self.q.get() 94 else: 95 96 self.generate_list.remove(current_thread) 97 98 def terminal(self): 99 """ 100 终止线程池中的所有线程 101 """ 102 self.cancel = True 103 full_size = len(self.generate_list) 104 while full_size: 105 self.q.put(StopEvent) 106 full_size -= 1 107 108 @contextlib.contextmanager 109 def worker_state(self, state_list, worker_thread): 110 """ 111 用于记录线程中正在等待的线程数 112 """ 113 state_list.append(worker_thread) 114 try: 115 yield 116 finally: 117 state_list.remove(worker_thread)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue 5 import threading 6 import contextlib 7 import time 8 9 StopEvent = object() 10 11 12 class ThreadPool(object): 13 14 def __init__(self, max_num): 15 self.q = queue.Queue() 16 self.max_num = max_num 17 18 self.terminal = False 19 self.generate_list = [] 20 self.free_list = [] 21 22 def run(self, func, args, callback=None): 23 """ 24 线程池执行一个任务 25 :param func: 任务函数 26 :param args: 任务函数所需参数 27 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 28 :return: 如果线程池已经终止,则返回True否则None 29 """ 30 31 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 32 self.generate_thread() 33 w = (func, args, callback,) 34 self.q.put(w) 35 36 def generate_thread(self): 37 """ 38 创建一个线程 39 """ 40 t = threading.Thread(target=self.call) 41 t.start() 42 43 def call(self): 44 """ 45 循环去获取任务函数并执行任务函数 46 """ 47 current_thread = threading.currentThread 48 self.generate_list.append(current_thread) 49 50 event = self.q.get() 51 while event != StopEvent: 52 53 func, arguments, callback = event 54 try: 55 result = func(*arguments) 56 success = True 57 except Exception as e: 58 success = False 59 result = None 60 61 if callback is not None: 62 try: 63 callback(success, result) 64 except Exception as e: 65 pass 66 67 with self.worker_state(self.free_list, current_thread): 68 if self.terminal: 69 event = StopEvent 70 else: 71 event = self.q.get() 72 else: 73 74 self.generate_list.remove(current_thread) 75 76 def close(self): 77 """ 78 执行完所有的任务后,所有线程停止 79 """ 80 full_size = len(self.generate_list) 81 while full_size: 82 self.q.put(StopEvent) 83 full_size -= 1 84 85 def terminate(self): 86 """ 87 无论是否还有任务,终止线程 88 """ 89 self.terminal = True 90 91 while self.generate_list: 92 self.q.put(StopEvent) 93 94 self.q.empty() 95 96 97 @contextlib.contextmanager 98 def worker_state(self, state_list, worker_thread): 99 """ 100 用于记录线程中正在等待的线程数 101 """ 102 state_list.append(worker_thread) 103 try: 104 yield 105 finally: 106 state_list.remove(worker_thread) 107 108 109 """ 110 # How to use 111 112 113 pool = ThreadPool(5) 114 115 def callback(status, result): 116 # status, execute action status 117 # result, execute action return value 118 pass 119 120 121 def action(i): 122 time.sleep(1) 123 print(i) 124 125 for i in range(30): 126 ret = pool.run(action, (i,), callback) 127 128 # pool.close() 129 # pool.terminate() 130 """
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 import queue 5 import threading 6 import contextlib 7 import time 8 9 StopEvent = object() 10 11 12 class ThreadPool(object): 13 14 def __init__(self, max_num, max_task_num = None): 15 if max_task_num: 16 self.q = queue.Queue(max_task_num) 17 else: 18 self.q = queue.Queue() 19 self.max_num = max_num 20 self.cancel = False 21 self.terminal = False 22 self.generate_list = [] 23 self.free_list = [] 24 25 def run(self, func, args, callback=None): 26 """ 27 线程池执行一个任务 28 :param func: 任务函数 29 :param args: 任务函数所需参数 30 :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 31 :return: 如果线程池已经终止,则返回True否则None 32 """ 33 if self.cancel: 34 return 35 if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 36 self.generate_thread() 37 w = (func, args, callback,) 38 self.q.put(w) 39 40 def generate_thread(self): 41 """ 42 创建一个线程 43 """ 44 t = threading.Thread(target=self.call) 45 t.start() 46 47 def call(self): 48 """ 49 循环去获取任务函数并执行任务函数 50 """ 51 current_thread = threading.currentThread 52 self.generate_list.append(current_thread) 53 54 event = self.q.get() 55 while event != StopEvent: 56 57 func, arguments, callback = event 58 try: 59 result = func(*arguments) 60 success = True 61 except Exception as e: 62 success = False 63 result = None 64 65 if callback is not None: 66 try: 67 callback(success, result) 68 except Exception as e: 69 pass 70 71 with self.worker_state(self.free_list, current_thread): 72 if self.terminal: 73 event = StopEvent 74 else: 75 event = self.q.get() 76 else: 77 78 self.generate_list.remove(current_thread) 79 80 def close(self): 81 """ 82 执行完所有的任务后,所有线程停止 83 """ 84 self.cancel = True 85 full_size = len(self.generate_list) 86 while full_size: 87 self.q.put(StopEvent) 88 full_size -= 1 89 90 def terminate(self): 91 """ 92 无论是否还有任务,终止线程 93 """ 94 self.terminal = True 95 96 while self.generate_list: 97 self.q.put(StopEvent) 98 99 self.q.empty() 100 101 @contextlib.contextmanager 102 def worker_state(self, state_list, worker_thread): 103 """ 104 用于记录线程中正在等待的线程数 105 """ 106 state_list.append(worker_thread) 107 try: 108 yield 109 finally: 110 state_list.remove(worker_thread) 111 112 113 114 # How to use 115 116 117 pool = ThreadPool(5) 118 119 def callback(status, result): 120 # status, execute action status 121 # result, execute action return value 122 pass 123 124 125 def action(i): 126 print(i) 127 128 for i in range(30): 129 ret = pool.run(action, (i,), callback) 130 131 time.sleep(5) 132 print(len(pool.generate_list), len(pool.free_list)) 133 print(len(pool.generate_list), len(pool.free_list)) 134 # pool.close() 135 # pool.terminate()
更多参见:twisted.python.threadpool
上下文管理:https://docs.python.org/2/library/contextlib.html
更多详情请关注cnblogs/武sir
标签:
原文地址:http://www.cnblogs.com/237325670qqcom/p/5654633.html