码迷,mamicode.com
首页 > 数据库 > 详细

mrjob 使用 mongoldb 数据源【转】

时间:2015-11-05 15:12:29      阅读:314      评论:0      收藏:0      [点我收藏+]

标签:

最近做的事情是用mrjob写mapreduce程序,从mongo读取数据。我的做法很容易也很好懂,因为mrjob可以支持sys.stdin的读取,所以我考虑用一个python程序读mongo中的数据,然后同时让mrjob脚本接受输入,处理,输出。

 

具体方式:

readInMongoDB.py:

 

#coding:UTF-8
‘‘‘
Created on 2014年5月28日

@author: hao
‘‘‘
import pymongo
pyconn = pymongo.Connection(host,port=27017)
pycursor = pyconn.userid_cid_score.find().batch_size(30)
for i in pycursor:
    userId = i[‘userId‘]
    cid = i[‘cid‘]
    score = i[‘score‘]
#     temp = list()
#     temp.append(userId)
#     temp.append(cid)
#     temp.append(score)
    print str(userId)+‘,‘+str(cid)+‘,‘+str(score)

 

 

step1.py:

 

#coding:UTF-8
‘‘‘
Created on 2014年5月27日

@author: hao
‘‘‘
from mrjob.job import MRJob
# from mrjob import protocol
import pymongo
import logging
import simplejson as sj

class step(MRJob):
    ‘‘‘
    ‘‘‘
#     logging.c
    def parseMatrix(self, _, line):
        ‘‘‘
        input one stdin for pymongo onetime search
        output contentId, (userId, rating)
        ‘‘‘
        line = (str(line))
        line=line.split(,)
        userId = line[0]
#         print userId
        cid = line[1]
#         print cid
        score = float(line[2])
#         print score
        yield cid, (userId, float(score))        

    
    def scoreCombine(self, cid, userRating):
        ‘‘‘
        将对同一个内容的(用户,评分)拼到一个list里
        ‘‘‘
        userRatings = list()
        for i in userRating:
            userRatings.append(i)
        yield cid, userRatings
        
    def userBehavior(self, cid, userRatings):
        ‘‘‘        
        ‘‘‘
        scoreList = list()
        for doc in userRatings:
            # 每个combiner结果
            for i in doc:
                scoreList.append(i)
        for user1 in scoreList:
            for user2 in scoreList:
                if user1[0] == user2[0]:
                    continue
                yield (user1[0], user2[0]), (user1[1], user2[1])
    
    
    def steps(self):
        return [self.mr(mapper = self.parseMatrix,
                        reducer = self.scoreCombine),
                self.mr(reducer = self.userBehavior),]
    
    
if __name__==__main__:
    
    fp = open(a.txt,w)
    fp.write([)
    step.run()
    fp.write(])
    fp.close()

 

 

然后执行脚本  python readInMongoDB.py | python step1.py >> out.txt 

这个方式在本地执行的非常好,没有任何问题(除开mrjob速度的问题,其实在本次应用中影响不大)

 

原文:http://blog.csdn.net/whzhcahzxh/article/details/29587059

mrjob 使用 mongoldb 数据源【转】

标签:

原文地址:http://www.cnblogs.com/olivetree123/p/4939429.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!