码迷,mamicode.com
首页 > 编程语言 > 详细

python线程池

时间:2016-07-08 23:15:13      阅读:332      评论:0      收藏:0      [点我收藏+]

标签:

线程池:

版本一:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import Queue
import threading
 
 
class ThreadPool(object):
 
    def __init__(self, max_num=20):
        self.queue = Queue.Queue(max_num)
        for i in xrange(max_num):
            self.queue.put(threading.Thread)
 
    def get_thread(self):
        return self.queue.get()
 
    def add_thread(self):
        self.queue.put(threading.Thread)
 
"""
pool = ThreadPool(10)
 
def func(arg, p):
    print arg
    import time
    time.sleep(2)
    p.add_thread()
 
 
for i in xrange(30):
    thread = pool.get_thread()
    t = thread(target=func, args=(i, pool))
    t.start()
"""

版本二:

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3  
  4 """
  5 custom ThreadPool
  6  
  7 How to use:
  8  
  9 pool = ThreadPool(1)
 10  
 11 def callback(status, result):
 12     # status, execute action status
 13     # result, execute action return value
 14  
 15     pass
 16 def action(i):
 17     pass
 18  
 19 for i in range(20):
 20     if pool.stop:
 21         pool.terminal()
 22         break
 23     ret = pool.run(action, (i,), callback)
 24  
 25 print ‘end‘
 26  
 27  
 28 """
 29  
 30  
 31 import Queue
 32 import threading
 33 import contextlib
 34  
 35 StopEvent = object()
 36  
 37  
 38 class ThreadPool(object):
 39  
 40     def __init__(self, max_num):
 41         self.q = Queue.Queue(max_num)
 42         self.max_num = max_num
 43         self.cancel = False
 44         self.generate_list = []
 45         self.free_list = []
 46  
 47     def run(self, func, args, callback=None):
 48         """
 49         线程池执行一个任务
 50         :param func: 任务函数
 51         :param args: 任务函数所需参数
 52         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 53         :return: 如果线程池已经终止,则返回True否则None
 54         """
 55         if self.cancel:
 56             return True
 57         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 58             self.generate_thread()
 59         w = (func, args, callback,)
 60         self.q.put(w)
 61  
 62     def generate_thread(self):
 63         """
 64         创建一个线程
 65         """
 66         t = threading.Thread(target=self.call)
 67         t.start()
 68  
 69     def call(self):
 70         """
 71         循环去获取任务函数并执行任务函数
 72         """
 73         current_thread = threading.currentThread
 74         self.generate_list.append(current_thread)
 75  
 76         event = self.q.get()
 77         while event != StopEvent:
 78             func, arguments, callback = event
 79             try:
 80                 result = func(*arguments)
 81                 success = True
 82             except Exception, e:
 83                 success = False
 84                 result = None
 85  
 86             if callback is not None:
 87                 try:
 88                     callback(success, result)
 89                 except Exception, e:
 90                     pass
 91  
 92             with self.worker_state(self.free_list, current_thread):
 93                 event = self.q.get()
 94         else:
 95  
 96             self.generate_list.remove(current_thread)
 97  
 98     def terminal(self):
 99         """
100         终止线程池中的所有线程
101         """
102         self.cancel = True
103         full_size = len(self.generate_list)
104         while full_size:
105             self.q.put(StopEvent)
106             full_size -= 1
107  
108     @contextlib.contextmanager
109     def worker_state(self, state_list, worker_thread):
110         """
111         用于记录线程中正在等待的线程数
112         """
113         state_list.append(worker_thread)
114         try:
115             yield
116         finally:
117             state_list.remove(worker_thread)
技术分享
  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 
  4 import queue
  5 import threading
  6 import contextlib
  7 import time
  8 
  9 StopEvent = object()
 10 
 11 
 12 class ThreadPool(object):
 13 
 14     def __init__(self, max_num):
 15         self.q = queue.Queue()
 16         self.max_num = max_num
 17 
 18         self.terminal = False
 19         self.generate_list = []
 20         self.free_list = []
 21 
 22     def run(self, func, args, callback=None):
 23         """
 24         线程池执行一个任务
 25         :param func: 任务函数
 26         :param args: 任务函数所需参数
 27         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 28         :return: 如果线程池已经终止,则返回True否则None
 29         """
 30 
 31         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 32             self.generate_thread()
 33         w = (func, args, callback,)
 34         self.q.put(w)
 35 
 36     def generate_thread(self):
 37         """
 38         创建一个线程
 39         """
 40         t = threading.Thread(target=self.call)
 41         t.start()
 42 
 43     def call(self):
 44         """
 45         循环去获取任务函数并执行任务函数
 46         """
 47         current_thread = threading.currentThread
 48         self.generate_list.append(current_thread)
 49 
 50         event = self.q.get()
 51         while event != StopEvent:
 52 
 53             func, arguments, callback = event
 54             try:
 55                 result = func(*arguments)
 56                 success = True
 57             except Exception as e:
 58                 success = False
 59                 result = None
 60 
 61             if callback is not None:
 62                 try:
 63                     callback(success, result)
 64                 except Exception as e:
 65                     pass
 66 
 67             with self.worker_state(self.free_list, current_thread):
 68                 if self.terminal:
 69                     event = StopEvent
 70                 else:
 71                     event = self.q.get()
 72         else:
 73 
 74             self.generate_list.remove(current_thread)
 75 
 76     def close(self):
 77         """
 78         执行完所有的任务后,所有线程停止
 79         """
 80         full_size = len(self.generate_list)
 81         while full_size:
 82             self.q.put(StopEvent)
 83             full_size -= 1
 84 
 85     def terminate(self):
 86         """
 87         无论是否还有任务,终止线程
 88         """
 89         self.terminal = True
 90 
 91         while self.generate_list:
 92             self.q.put(StopEvent)
 93 
 94         self.q.empty()
 95 
 96 
 97     @contextlib.contextmanager
 98     def worker_state(self, state_list, worker_thread):
 99         """
100         用于记录线程中正在等待的线程数
101         """
102         state_list.append(worker_thread)
103         try:
104             yield
105         finally:
106             state_list.remove(worker_thread)
107 
108 
109 """
110 # How to use
111 
112 
113 pool = ThreadPool(5)
114 
115 def callback(status, result):
116     # status, execute action status
117     # result, execute action return value
118     pass
119 
120 
121 def action(i):
122     time.sleep(1)
123     print(i)
124 
125 for i in range(30):
126     ret = pool.run(action, (i,), callback)
127 
128 # pool.close()
129 # pool.terminate()
130 """
更新1
技术分享
  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 
  4 import queue
  5 import threading
  6 import contextlib
  7 import time
  8 
  9 StopEvent = object()
 10 
 11 
 12 class ThreadPool(object):
 13 
 14     def __init__(self, max_num, max_task_num = None):
 15         if max_task_num:
 16             self.q = queue.Queue(max_task_num)
 17         else:
 18             self.q = queue.Queue()
 19         self.max_num = max_num
 20         self.cancel = False
 21         self.terminal = False
 22         self.generate_list = []
 23         self.free_list = []
 24 
 25     def run(self, func, args, callback=None):
 26         """
 27         线程池执行一个任务
 28         :param func: 任务函数
 29         :param args: 任务函数所需参数
 30         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
 31         :return: 如果线程池已经终止,则返回True否则None
 32         """
 33         if self.cancel:
 34             return
 35         if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
 36             self.generate_thread()
 37         w = (func, args, callback,)
 38         self.q.put(w)
 39 
 40     def generate_thread(self):
 41         """
 42         创建一个线程
 43         """
 44         t = threading.Thread(target=self.call)
 45         t.start()
 46 
 47     def call(self):
 48         """
 49         循环去获取任务函数并执行任务函数
 50         """
 51         current_thread = threading.currentThread
 52         self.generate_list.append(current_thread)
 53 
 54         event = self.q.get()
 55         while event != StopEvent:
 56 
 57             func, arguments, callback = event
 58             try:
 59                 result = func(*arguments)
 60                 success = True
 61             except Exception as e:
 62                 success = False
 63                 result = None
 64 
 65             if callback is not None:
 66                 try:
 67                     callback(success, result)
 68                 except Exception as e:
 69                     pass
 70 
 71             with self.worker_state(self.free_list, current_thread):
 72                 if self.terminal:
 73                     event = StopEvent
 74                 else:
 75                     event = self.q.get()
 76         else:
 77 
 78             self.generate_list.remove(current_thread)
 79 
 80     def close(self):
 81         """
 82         执行完所有的任务后,所有线程停止
 83         """
 84         self.cancel = True
 85         full_size = len(self.generate_list)
 86         while full_size:
 87             self.q.put(StopEvent)
 88             full_size -= 1
 89 
 90     def terminate(self):
 91         """
 92         无论是否还有任务,终止线程
 93         """
 94         self.terminal = True
 95 
 96         while self.generate_list:
 97             self.q.put(StopEvent)
 98 
 99         self.q.empty()
100 
101     @contextlib.contextmanager
102     def worker_state(self, state_list, worker_thread):
103         """
104         用于记录线程中正在等待的线程数
105         """
106         state_list.append(worker_thread)
107         try:
108             yield
109         finally:
110             state_list.remove(worker_thread)
111 
112 
113 
114 # How to use
115 
116 
117 pool = ThreadPool(5)
118 
119 def callback(status, result):
120     # status, execute action status
121     # result, execute action return value
122     pass
123 
124 
125 def action(i):
126     print(i)
127 
128 for i in range(30):
129     ret = pool.run(action, (i,), callback)
130 
131 time.sleep(5)
132 print(len(pool.generate_list), len(pool.free_list))
133 print(len(pool.generate_list), len(pool.free_list))
134 # pool.close()
135 # pool.terminate()
更新2

更多参见:twisted.python.threadpool

上下文管理:https://docs.python.org/2/library/contextlib.html

更多详情请关注cnblogs/武sir

python线程池

标签:

原文地址:http://www.cnblogs.com/237325670qqcom/p/5654633.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!