码迷,mamicode.com
首页 > 编程语言 > 详细

Python的并行求和例子

时间:2015-03-04 12:42:44      阅读:206      评论:0      收藏:0      [点我收藏+]

标签:

先上一个例子,这段代码是为了评估一个预测模型写的,详细评价说明在

https://www.kaggle.com/c/how-much-did-it-rain/details/evaluation,

它的核心是要计算

技术分享

在实际计算过程中,n很大(1126694),以至于单进程直接计算时间消耗巨大(14分10秒),

所以这里参考mapReduce的思想,尝试使用多进程的方式进行计算,即每个进程计算一部分n,最后将结果相加再计算C

代码如下:

import csv
import sys
import logging
import argparse
import numpy as np
import multiprocessing
import time

# configure logging
logger = logging.getLogger("example")

handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter(
    %(asctime)s %(levelname)s %(name)s: %(message)s))

logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

def H(n, z):
    return (n-z) >= 0

def evaluate(args, start, end):
    ‘‘‘handle range[start, end)‘‘‘
    logger.info("Started %d to %d" %(start, end))
    expReader = open(train_exp.csv,r)
    expReader.readline()
    for i in range(start):
        expReader.readline()
    predFile = open(args.predict)
    for i in range(start+1):
        predFile.readline()
    predReader = csv.reader(predFile, delimiter=,)
    squareErrorSum = 0
    totalLines = end - start
    for i, row in enumerate(predReader):
        if i == totalLines:
            logger.info("Completed %d to %d" %(start, end))
            break
        expId, exp = expReader.readline().strip().split(,)
        exp = float(exp)
        predId = row[0]
        row = np.array(row, dtype=float)
        assert expId == predId
        lineSum = 0
        for j in xrange(1,71):
            n = j - 1
            squareErrorSum += (row[j]-H(n,exp))**2
            lineSum += (row[j]-H(n,exp))**2
    logger.info(SquareErrorSum %d to %d: %f %(start, end, squareErrorSum))
    return squareErrorSum

def fileCmp(args):
    ‘‘‘check number of lines in two files are same‘‘‘
    for count, line in enumerate(open(train_exp.csv)):
        pass
    expLines = count + 1 - 1 #discare header
    for count, line in enumerate(open(args.predict)):
        pass
    predictLines = count + 1 - 1
    print Lines(exp, predict):, expLines, predictLines
    assert expLines == predictLines
    evaluate.Lines = expLines
    
if __name__ == "__main__":
    # set up logger
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(--predict, 
                        help=("path to an predict probability file, this will "
                              "predict_changeTimePeriod.csv"))
    args = parser.parse_args()
    fileCmp(args)
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    result = []
    blocks = multiprocessing.cpu_count()
    linesABlock = evaluate.Lines / blocks
    for i in xrange(blocks-1):
        result.append(pool.apply_async(evaluate, (args, i*linesABlock, (i+1)*linesABlock)))
    result.append(pool.apply_async(evaluate, (args, (i+1)*linesABlock, evaluate.Lines+1)))
    pool.close()
    pool.join()
    result = [res.get() for res in result]
    print result
    print evaluate.Lines, evaluate.Lines
    score = sum(result) / (70*evaluate.Lines)
    print "score:", score

这里是有几个CPU核心就分成几个进程进行计算,希望尽量榨干CPU的计算能力。实际上运行过程中CPU的占用率也一直是100%

测试后计算结果与单进程一致,计算时间缩短为11分47秒,快了16.8%。

提升没有想象中的大。

经过尝试直接用StringIO将原文件每个进程加载一份到内存在进行处理速度也没有进一步提升,结合CPU的100%占用率考虑看起来是因为计算能力还不够。

看来计算密集密集型的工作还是需要用C来写的:)

Python的并行求和例子

标签:

原文地址:http://www.cnblogs.com/instant7/p/4312786.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!