标签:ons port mapr cookie init self size efault multi
import collections
import itertools
import multiprocessing
import bz2
class MapReduce(object):
def __init__(self,map_func,reduce_func,num_workers=None):
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)
def partition(self,mapped_values):
partition_data = collections.defaultdict(list)
for key , value in mapped_values:
partition_data[key].append(value)
return partition_data.items()
def __call__(self, inputs,chunksize=1):
mao_response = self.pool.map(self.map_func,inputs,chunksize=chunksize)
partitioned_data = self.partition(itertools.chain(*mao_response))
reduce_values = self.pool.map(self.reduce_func,partitioned_data)
return reduce_values
def mapper_match(one_file):
output = []
for line in bz2.BZ2File(one_file).readlines():
line=line.rstrip().split()
if line[3] == ‘web‘ and line[5] == ‘0‘:
output.append((line[4],1))
def reduce_match(item):
cookie,occurances = item
return (cookie,sum(occurances))
def mapper_count(item):
_ , count = item
return [(count,1)]
def reducer_count(item):
freq , occurances = item
return ((freq,sum(occurances)))
import glob
import operator
input_files=‘sssssss‘
mapper = MapReduce(mapper_match,reduce_match)
cokkie_feq = mapper(input_files)
mapper = MapReduce(reducer_count,reducer_count)
cookie_fep = mapper(cokkie_feq)
cookie_fep.sort (key = operator.itemgetter(1),reverse = True)
for key ,value in cookie_fep:
print(key,value)
标签:ons port mapr cookie init self size efault multi
原文地址:https://www.cnblogs.com/1204guo/p/9166823.html