码迷,mamicode.com
首页 > 其他好文 > 详细

统计日志的不同条数

时间:2018-06-11 14:49:41      阅读:146      评论:0      收藏:0      [点我收藏+]

标签:ons   port   mapr   cookie   init   self   size   efault   multi   

import collections
import itertools
import multiprocessing
import bz2

class MapReduce(object):
    def __init__(self,map_func,reduce_func,num_workers=None):
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)

    def partition(self,mapped_values):
        partition_data = collections.defaultdict(list)
        for key , value in  mapped_values:
            partition_data[key].append(value)
        return partition_data.items()


    def __call__(self, inputs,chunksize=1):
        mao_response = self.pool.map(self.map_func,inputs,chunksize=chunksize)
        partitioned_data = self.partition(itertools.chain(*mao_response))
        reduce_values = self.pool.map(self.reduce_func,partitioned_data)
        return reduce_values


def mapper_match(one_file):
    output = []
    for line in bz2.BZ2File(one_file).readlines():
        line=line.rstrip().split()
        if line[3] == ‘web‘ and line[5] == ‘0‘:
            output.append((line[4],1))

def reduce_match(item):
    cookie,occurances = item
    return (cookie,sum(occurances))

def mapper_count(item):
    _ , count = item
    return [(count,1)]

def reducer_count(item):
    freq , occurances = item
    return ((freq,sum(occurances)))

import glob
import operator

input_files=‘sssssss‘

mapper = MapReduce(mapper_match,reduce_match)
cokkie_feq = mapper(input_files)
mapper = MapReduce(reducer_count,reducer_count)
cookie_fep = mapper(cokkie_feq)
cookie_fep.sort (key = operator.itemgetter(1),reverse = True)
for key ,value in cookie_fep:
    print(key,value)

  

统计日志的不同条数

标签:ons   port   mapr   cookie   init   self   size   efault   multi   

原文地址:https://www.cnblogs.com/1204guo/p/9166823.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!