标签:
初学python,实现了一个简单的线程池框架,线程池中除Wokers(工作线程)外,还单独创建了一个日志线程,用于日志的输出.线程间采用Queue方式进行通信.
代码如下:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 __author__ = "pandaychen" 5 6 import Queue 7 import sys 8 import os 9 import threading 10 import time 11 import signal 12 13 def handler(): 14 print "press CTRL+C to end...." 15 sys.exit(1) 16 17 18 def call_function(para): 19 time.sleep(5) 20 return para 21 22 23 def LoggingFun(t_filename,t_logcontent): 24 logpath = ‘./log/‘ 25 curdate = time.strftime("%Y%m%d") 26 newpath = ‘./log/‘+t_filename+‘_‘+curdate 27 28 if os.path.exists(logpath): 29 pass 30 else: 31 os.mkdir(logpath) 32 33 try: 34 filehd = open(newpath,‘a+‘) 35 newcontent = ‘[‘+str(time.strftime("%Y-%m-%d %H:%M:%S"))+‘]‘+t_logcontent+‘\n‘ 36 filehd.writelines(newcontent) 37 filehd.close() 38 except Exception,e: 39 pass 40 41 class LogThread(threading.Thread): 42 def __init__(self,logQueue,**kwds): 43 threading.Thread.__init__(self,**kwds) 44 self.logQueue = logQueue 45 self.setDaemon(True) 46 47 def run(self): 48 while 1: 49 #log = self.logQueue.get(False) 50 log = self.logQueue.get() 51 if log: 52 LoggingFun("test",log) 53 pass 54 else: 55 LoggingFun("test","log thread sleep 1s") 56 time.sleep(1) 57 58 #封装为一个线程类 59 class Worker(threading.Thread): # 处理工作请求 60 def __init__(self, workQueue, resultQueue,logQueue, threadid,**kwds): 61 threading.Thread.__init__(self, **kwds) 62 self.setDaemon(True) 63 self.workQueue = workQueue 64 self.resultQueue = resultQueue 65 self.logQueue = logQueue 66 self.threadid = threadid 67 68 def run(self): 69 while 1: 70 try: 71 callable, args, kwds = self.workQueue.get(False) # get a task 72 res = callable(*args, **kwds) 73 strres = "thread:"+ str(self.threadid) + " done,"+"args:"+str(res) 74 75 self.logQueue.put(strres) 76 self.resultQueue.put(res) # put result 77 except Queue.Empty: 78 break 79 80 class WorkManagerPool: # 线程池管理,创建 81 def __init__(self, num_of_workers=10): 82 self.workQueue = Queue.Queue() # 请求队列 83 self.resultQueue = Queue.Queue() # 输出结果的队列 84 self.logQueue = Queue.Queue() 85 self.workers = [] 86 self._recruitThreads(num_of_workers) 87 88 def _recruitThreads(self, num_of_workers): 89 for i in range(num_of_workers): 90 worker = Worker(self.workQueue, self.resultQueue,self.logQueue,i) # 创建工作线程 91 worker.setDaemon(True) 92 self.workers.append(worker) # 加入到线程队列 93 94 logthread = LogThread(self.logQueue) 95 self.workers.append(logthread) 96 97 98 def start(self): 99 for w in self.workers: 100 w.start() 101 102 def wait_for_complete(self): 103 while len(self.workers): 104 worker = self.workers.pop() # 从池中取出一个线程处理请求 105 worker.join() 106 if worker.isAlive() and not self.workQueue.empty(): 107 self.workers.append(worker) # 重新加入线程池中 108 print ‘All jobs were complete.‘ 109 110 111 def add_job(self, callable, *args, **kwds): 112 self.workQueue.put((callable, args, kwds)) # 向工作队列中加入请求 113 114 def get_result(self, *args, **kwds): 115 return self.resultQueue.get(*args, **kwds) 116 117 118 119 def main(): 120 signal.signal(signal.SIGINT, handler) 121 signal.signal(signal.SIGTERM, handler) 122 123 try: 124 num_of_threads = int(sys.argv[1]) 125 except: 126 num_of_threads = 10 127 start = time.time() 128 workermanagepool = WorkManagerPool(num_of_threads) 129 #print num_of_threads 130 urls = [‘http://bbs.qcloud.com‘] * 1000 131 for i in urls: 132 workermanagepool.add_job(call_function, i) 133 134 workermanagepool.start() 135 workermanagepool.wait_for_complete() 136 print time.time() - start 137 138 if __name__ == ‘__main__‘: 139 main()
标签:
原文地址:http://www.cnblogs.com/panada/p/4929067.html