标签:des style blog http color io os ar java
要说明的是, 串行版本足够快了, 在我的酷睿双核 debian7.6 下运行只要 0.2s , 简直是难以超越。 多进程版本难以避免大量的进程创建和数据同步与传输开销, 性能反而不如串行版本, 只能作为学习的示例了。 以后再优化吧。
#------------------------------------------------------------------------------- # Name: wordstat_multiprocessing.py # Purpose: statistic words in java files of given directory by multiprocessing # # Author: qin.shuq # # Created: 09/10/2014 # Copyright: (c) qin.shuq 2014 # Licence: <your licence> #------------------------------------------------------------------------------- import re import os import time import logging from Queue import Empty from multiprocessing import Process, Manager, Pool, Pipe, cpu_count LOG_LEVELS = { ‘DEBUG‘: logging.DEBUG, ‘INFO‘: logging.INFO, ‘WARN‘: logging.WARNING, ‘ERROR‘: logging.ERROR, ‘CRITICAL‘: logging.CRITICAL } ncpu = cpu_count() def initlog(filename) : logger = logging.getLogger() hdlr = logging.FileHandler(filename) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") hdlr.setFormatter(formatter) logger.addHandler(hdlr) logger.setLevel(LOG_LEVELS[‘INFO‘]) return logger errlog = initlog("error.log") infolog = initlog("info.log") class FileObtainer(object): def __init__(self, dirpath, fileFilterFunc=None): self.dirpath = dirpath self.fileFilterFunc = fileFilterFunc def findAllFilesInDir(self): files = [] for path, dirs, filenames in os.walk(self.dirpath): if len(filenames) > 0: for filename in filenames: files.append(path+‘/‘+filename) if self.fileFilterFunc is None: return files else: return filter(self.fileFilterFunc, files) class MultiQueue(object): def __init__(self, qnum, timeout): manager = Manager() self.timeout = timeout self.qnum = qnum self.queues = [] self.pindex = 0 for i in range(self.qnum): qLines = manager.Queue() self.queues.append(qLines) def put(self, obj): self.queues[self.pindex].put(obj) self.pindex = (self.pindex+1) % self.qnum def get(self): for i in range(self.qnum): try: obj = self.queues[i].get(True, self.timeout) return obj except Empty, emp: print ‘Not Get.‘ errlog.error(‘In WordReading:‘ + str(emp)) return None def readFile(filename): try: f = open(filename, ‘r‘) lines = f.readlines() infolog.info(‘[successful read file %s]\n‘ % filename) f.close() return lines except IOError, err: errorInfo = ‘file %s Not found \n‘ % filename errlog.error(errorInfo) return [] def batchReadFiles(fileList, ioPool, mq): futureResult = [] for filename in fileList: futureResult.append(ioPool.apply_async(readFile, args=(filename,))) allLines = [] for res in futureResult: allLines.extend(res.get()) mq.put(allLines) class WordReading(object): def __init__(self, allFiles, mq): self.allFiles = allFiles self.mq = mq self.ioPool = Pool(ncpu*3) infolog.info(‘WordReading Initialized‘) def run(self): fileNum = len(allFiles) batchReadFiles(self.allFiles, self.ioPool, self.mq) def processLines(lines): result = {} linesContent = ‘‘.join(lines) matches = WordAnalyzing.wordRegex.findall(linesContent) if matches: for word in matches: if result.get(word) is None: result[word] = 0 result[word] += 1 return result def mergeToSrcMap(srcMap, destMap): for key, value in destMap.iteritems(): if srcMap.get(key): srcMap[key] = srcMap.get(key)+destMap.get(key) else: srcMap[key] = destMap.get(key) return srcMap class WordAnalyzing(object): ‘‘‘ return Map<Word, count> the occurrence times of each word ‘‘‘ wordRegex = re.compile("[\w]+") def __init__(self, mq, conn): self.mq = mq self.cpuPool = Pool(ncpu) self.conn = conn self.resultMap = {} infolog.info(‘WordAnalyzing Initialized‘) def run(self): starttime = time.time() lines = [] futureResult = [] while True: lines = self.mq.get() if lines is None: break futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,))) resultMap = {} for res in futureResult: mergeToSrcMap(self.resultMap, res.get()) endtime = time.time() print ‘WordAnalyzing analyze cost: ‘, (endtime-starttime)*1000 , ‘ms‘ self.conn.send(‘OK‘) self.conn.close() def obtainResult(self): return self.resultMap class PostProcessing(object): def __init__(self, resultMap): self.resultMap = resultMap def sortByValue(self): return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True) def obtainTopN(self, topN): sortedResult = self.sortByValue() sortedNum = len(sortedResult) topN = sortedNum if topN > sortedNum else topN for i in range(topN): topi = sortedResult[i] print topi[0], ‘ counts: ‘, topi[1] if __name__ == "__main__": dirpath = "/home/lovesqcc/workspace/java/javastudy/src/" if not os.path.exists(dirpath): print ‘dir %s not found.‘ % dirpath exit(1) fileObtainer = FileObtainer(dirpath, lambda f: f.endswith(‘.java‘)) allFiles = fileObtainer.findAllFilesInDir() mqTimeout = 0.01 mqNum = 1 mq = MultiQueue(mqNum, timeout=mqTimeout) p_conn, c_conn = Pipe() wr = WordReading(allFiles, mq) wa = WordAnalyzing(mq, c_conn) wr.run() wa.run() msg = p_conn.recv() if msg == ‘OK‘: pass # taking less time, parallel not needed. postproc = PostProcessing(wa.obtainResult()) postproc.obtainTopN(30) print ‘exit the program.‘
标签:des style blog http color io os ar java