码迷,mamicode.com
首页 > 编程语言 > 详细

一个简单的python线程池框架

时间:2015-11-02 01:33:00      阅读:315      评论:0      收藏:0      [点我收藏+]

标签:

  初学python,实现了一个简单的线程池框架,线程池中除Wokers(工作线程)外,还单独创建了一个日志线程,用于日志的输出.线程间采用Queue方式进行通信.

代码如下:

  

  1 #!/usr/bin/env python
  2 # -*- coding:utf-8 -*-
  3 
  4 __author__ = "pandaychen"
  5 
  6 import Queue
  7 import sys
  8 import os
  9 import threading
 10 import time
 11 import signal
 12 
 13 def handler():
 14     print "press CTRL+C to end...."
 15     sys.exit(1)
 16 
 17 
 18 def call_function(para):
 19     time.sleep(5)
 20     return para
 21 
 22 
 23 def LoggingFun(t_filename,t_logcontent):
 24     logpath = ./log/
 25     curdate = time.strftime("%Y%m%d")
 26     newpath = ./log/+t_filename+_+curdate
 27 
 28     if os.path.exists(logpath):
 29         pass
 30     else:
 31         os.mkdir(logpath)
 32 
 33     try:
 34         filehd = open(newpath,a+)
 35         newcontent = [+str(time.strftime("%Y-%m-%d %H:%M:%S"))+]+t_logcontent+\n
 36         filehd.writelines(newcontent)
 37         filehd.close()
 38     except Exception,e:
 39         pass
 40 
 41 class LogThread(threading.Thread):
 42     def __init__(self,logQueue,**kwds):
 43         threading.Thread.__init__(self,**kwds)
 44         self.logQueue = logQueue
 45         self.setDaemon(True)
 46 
 47     def run(self):
 48         while 1:
 49             #log = self.logQueue.get(False)
 50             log = self.logQueue.get()
 51             if log:
 52                 LoggingFun("test",log)
 53                 pass
 54             else:
 55                 LoggingFun("test","log thread sleep 1s")
 56                 time.sleep(1)
 57 
 58 #封装为一个线程类
 59 class Worker(threading.Thread):    # 处理工作请求
 60     def __init__(self, workQueue, resultQueue,logQueue, threadid,**kwds):
 61         threading.Thread.__init__(self, **kwds)
 62         self.setDaemon(True)
 63         self.workQueue = workQueue
 64         self.resultQueue = resultQueue
 65         self.logQueue = logQueue
 66         self.threadid = threadid
 67 
 68     def run(self):
 69         while 1:
 70             try:
 71                 callable, args, kwds = self.workQueue.get(False)    # get a task
 72                 res = callable(*args, **kwds)
 73                 strres = "thread:"+ str(self.threadid) + " done,"+"args:"+str(res)
 74 
 75                 self.logQueue.put(strres)
 76                 self.resultQueue.put(res)    # put result
 77             except Queue.Empty:
 78                 break
 79 
 80 class WorkManagerPool:    # 线程池管理,创建
 81     def __init__(self, num_of_workers=10):
 82         self.workQueue = Queue.Queue()    # 请求队列
 83         self.resultQueue = Queue.Queue()    # 输出结果的队列
 84         self.logQueue = Queue.Queue()
 85         self.workers = []
 86         self._recruitThreads(num_of_workers)
 87 
 88     def _recruitThreads(self, num_of_workers):
 89         for i in range(num_of_workers):
 90             worker = Worker(self.workQueue, self.resultQueue,self.logQueue,i)    # 创建工作线程
 91             worker.setDaemon(True)
 92             self.workers.append(worker)    # 加入到线程队列
 93 
 94         logthread = LogThread(self.logQueue)
 95         self.workers.append(logthread)
 96 
 97 
 98     def start(self):
 99         for w in self.workers:
100             w.start()
101 
102     def wait_for_complete(self):
103         while len(self.workers):
104             worker = self.workers.pop()    # 从池中取出一个线程处理请求
105             worker.join()
106             if worker.isAlive() and not self.workQueue.empty():
107                 self.workers.append(worker)    # 重新加入线程池中
108                 print All jobs were complete.
109 
110 
111     def add_job(self, callable, *args, **kwds):
112         self.workQueue.put((callable, args, kwds))    # 向工作队列中加入请求
113 
114     def get_result(self, *args, **kwds):
115         return self.resultQueue.get(*args, **kwds)
116 
117 
118 
119 def main():
120     signal.signal(signal.SIGINT, handler)
121     signal.signal(signal.SIGTERM, handler)
122 
123     try:
124         num_of_threads = int(sys.argv[1])
125     except:
126         num_of_threads = 10
127         start = time.time()
128         workermanagepool = WorkManagerPool(num_of_threads)
129         #print num_of_threads
130         urls = [http://bbs.qcloud.com] * 1000
131         for i in urls:
132             workermanagepool.add_job(call_function, i)
133 
134         workermanagepool.start()
135         workermanagepool.wait_for_complete()
136         print time.time() - start
137 
138 if __name__ == __main__:
139     main()

 

一个简单的python线程池框架

标签:

原文地址:http://www.cnblogs.com/panada/p/4929067.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!