码迷,mamicode.com
首页 > 编程语言 > 详细

python实现指定目录下JAVA文件单词计数的多进程版本

时间:2014-10-23 01:22:29      阅读:323      评论:0      收藏:0      [点我收藏+]

标签:des   style   blog   http   color   io   os   ar   java   

    

       要说明的是, 串行版本足够快了, 在我的酷睿双核 debian7.6 下运行只要 0.2s , 简直是难以超越。 多进程版本难以避免大量的进程创建和数据同步与传输开销, 性能反而不如串行版本, 只能作为学习的示例了。 以后再优化吧。

       

#-------------------------------------------------------------------------------
# Name:        wordstat_multiprocessing.py
# Purpose:     statistic words in java files of given directory by multiprocessing
#
# Author:      qin.shuq
#
# Created:     09/10/2014
# Copyright:   (c) qin.shuq 2014
# Licence:     <your licence>
#-------------------------------------------------------------------------------

import re
import os
import time
import logging
from Queue import Empty
from multiprocessing import Process, Manager, Pool, Pipe, cpu_count

LOG_LEVELS = {
    DEBUG: logging.DEBUG, INFO: logging.INFO,
    WARN: logging.WARNING, ERROR: logging.ERROR,
    CRITICAL: logging.CRITICAL
}

ncpu = cpu_count()

def initlog(filename) :

    logger = logging.getLogger()
    hdlr = logging.FileHandler(filename)
    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
    hdlr.setFormatter(formatter)
    logger.addHandler(hdlr)
    logger.setLevel(LOG_LEVELS[INFO])

    return logger


errlog = initlog("error.log")
infolog = initlog("info.log")


class FileObtainer(object):

    def __init__(self, dirpath, fileFilterFunc=None):
        self.dirpath = dirpath
        self.fileFilterFunc = fileFilterFunc

    def findAllFilesInDir(self):
        files = []
        for path, dirs, filenames in os.walk(self.dirpath):
            if len(filenames) > 0:
                for filename in filenames:
                    files.append(path+/+filename)

        if self.fileFilterFunc is None:
            return files
        else:
            return filter(self.fileFilterFunc, files)

class MultiQueue(object):

    def __init__(self, qnum, timeout):
        manager = Manager()
        self.timeout = timeout
        self.qnum = qnum
        self.queues = []
        self.pindex = 0
        for i in range(self.qnum):
            qLines = manager.Queue()
            self.queues.append(qLines)

    def put(self, obj):
        self.queues[self.pindex].put(obj)
        self.pindex = (self.pindex+1) % self.qnum

    def get(self):
        for i in range(self.qnum):
            try:
                obj = self.queues[i].get(True, self.timeout)
                return obj
            except Empty, emp:
                print Not Get.
                errlog.error(In WordReading: + str(emp))
        return None

def readFile(filename):
    try:
        f = open(filename, r)
        lines = f.readlines()
        infolog.info([successful read file %s]\n % filename)
        f.close()
        return lines
    except IOError, err:
        errorInfo = file %s Not found \n % filename
        errlog.error(errorInfo)
        return []

def batchReadFiles(fileList, ioPool, mq):
    futureResult = []
    for filename in fileList:
        futureResult.append(ioPool.apply_async(readFile, args=(filename,)))
    
    allLines = []
    for res in futureResult:
        allLines.extend(res.get())
    mq.put(allLines)


class WordReading(object):

    def __init__(self, allFiles, mq):
        self.allFiles = allFiles
        self.mq = mq
        self.ioPool = Pool(ncpu*3)
        infolog.info(WordReading Initialized)
    
    def run(self):
        fileNum = len(allFiles)
        batchReadFiles(self.allFiles, self.ioPool, self.mq)

def processLines(lines):
    result = {}
    linesContent = ‘‘.join(lines)
    matches = WordAnalyzing.wordRegex.findall(linesContent)
    if matches:
        for word in matches:
            if result.get(word) is None:
                result[word] = 0
            result[word] += 1
    return result

def mergeToSrcMap(srcMap, destMap):
    for key, value in destMap.iteritems():
        if srcMap.get(key):
            srcMap[key] = srcMap.get(key)+destMap.get(key)
        else:
            srcMap[key] = destMap.get(key)
    return srcMap

class WordAnalyzing(object):
    ‘‘‘
     return Map<Word, count>  the occurrence times of each word
    ‘‘‘
    wordRegex = re.compile("[\w]+")

    def __init__(self, mq, conn):
        self.mq = mq
        self.cpuPool = Pool(ncpu)
        self.conn = conn
        self.resultMap = {}

        infolog.info(WordAnalyzing Initialized)

    def run(self):
        starttime = time.time()
        lines = []
        futureResult = []
        while True:
            lines = self.mq.get()
            if lines is None:
                break
            futureResult.append(self.cpuPool.apply_async(processLines, args=(lines,)))

        resultMap = {}
        for res in futureResult:
            mergeToSrcMap(self.resultMap, res.get())
        endtime = time.time()
        print WordAnalyzing analyze cost: , (endtime-starttime)*1000 , ms

        self.conn.send(OK)
        self.conn.close()

    def obtainResult(self):
        return self.resultMap


class PostProcessing(object):

    def __init__(self, resultMap):
        self.resultMap = resultMap

    def sortByValue(self):
        return sorted(self.resultMap.items(),key=lambda e:e[1], reverse=True)

    def obtainTopN(self, topN):
        sortedResult = self.sortByValue()
        sortedNum = len(sortedResult)
        topN = sortedNum if topN > sortedNum else topN
        for i in range(topN):
            topi = sortedResult[i]
            print topi[0],  counts: , topi[1]

if __name__ == "__main__":

    dirpath = "/home/lovesqcc/workspace/java/javastudy/src/"

    if not os.path.exists(dirpath):
        print dir %s not found. % dirpath
        exit(1)

    fileObtainer = FileObtainer(dirpath, lambda f: f.endswith(.java))
    allFiles = fileObtainer.findAllFilesInDir()
    
    mqTimeout = 0.01
    mqNum = 1

    mq = MultiQueue(mqNum, timeout=mqTimeout)
    p_conn, c_conn = Pipe()
    wr = WordReading(allFiles, mq)
    wa = WordAnalyzing(mq, c_conn)

    wr.run()
    wa.run()

    msg = p_conn.recv()
    if msg == OK:
        pass

    # taking less time, parallel not needed.
    postproc = PostProcessing(wa.obtainResult())
    postproc.obtainTopN(30)

    print exit the program.

 

python实现指定目录下JAVA文件单词计数的多进程版本

标签:des   style   blog   http   color   io   os   ar   java   

原文地址:http://www.cnblogs.com/lovesqcc/p/4044777.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!