码迷,mamicode.com
首页 > 编程语言 > 详细

Hadoop Streaming例子(python)

时间:2014-11-23 13:07:40      阅读:270      评论:0      收藏:0      [点我收藏+]

标签:style   blog   ar   color   os   使用   sp   java   for   

  以前总是用java写一些MapReduce程序现举一个例子使用Python通过Hadoop Streaming来实现Mapreduce。

  任务描述:

  HDFS上有两个目录/a和/b,里面数据均有3列,第一列都是id,第二列是各自的业务类型(这里假设/a对应a,/b对应b),第三列是一个json串。各举一例:

  /a的一行:1234567  a  {"name":"jiufeng","age":"27","sex":"male","school":"","status":["111","000","001"],...}

  /b的一行:12345  b  {"a":"abc","b":"adr","xxoo":"e",...}

  要查找在/a中出现"status"且有"111"状态,而且要再/b中有这个id的所有id列表。

  那么来吧,首先需要mapper来提取/a中满足"status"有"111"状态的id和第二列"a"、/b中所有行的前两列,python代码如下,mapper.py:

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3 
 4 import json
 5 import sys
 6 import traceback
 7 import datetime,time
 8 
 9 def mapper():
10     for line in sys.stdin:
11         line = line.strip()
12         id,tag,content = line.split(\t)
13         if tag == a:
14             jstr = json.loads(content)
15             active = jstr.get(status,[])
16             if "111" in active:
17                 print %s\t%s %(id,tag)
18         if tag == b:
19             print %s\t%s % ( id,tag)
20 
21 if __name__ == __main__:
22     mapper()

  这个mapper是从表中输入中提取数据,然后将满足条件的数据通过标准输出。然后是reducer.py:

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3 
 4 import sys
 5 import json
 6 
 7 def reducer():
 8     tag_a = 0
 9     tag_b = 0
10     pre_id = ‘‘
11     for line in sys.stdin:
12         line = line.strip()
13         current_id,tag = line.split(\t)
14         if current_id != pre_id:
15             if tag_a==1 and tag_b==1:
16                 tag_a = 0
17                 tag_b = 0
18                 print %s % pre_id
19             else :
20                 tag_a = 0
21                 tag_b = 0
22         pre_id = current_id
23         if tag == a:
24             if tag_a == 0:
25                 tag_a = 1
26         if tag == b:
27             if tag_b == 0:
28                 tag_b = 1
29     if tag_b==1 and tag_b==1:
30         print %s % pre_id
31 
32 if __name__ == __main__:
33     reducer()

  一个reducer可以接受N多行数据,不像java那样的一行对应一个key然后多个value,而是一个key对应一个value,但好在相同key的行都是连续的,只要在key变化的时候做一下处理就行。

  然后安排让hadoop执行,schedule.py:

 1 #!/usr/bin/env python
 2 #coding = utf-8
 3 
 4 import subprocess, os
 5 import datetime
 6 
 7 
 8 def mr_job():
 9     mypath = os.path.dirname(os.path.abspath(__file__))
10     inputpath1 = /b/*
11     inputpath2 = /a/*
12     outputpath = /out/
13     mapper = mypath + /mapper.py
14     reducer = mypath + /reducer.py
15     cmds = [$HADOOP_HOME/bin/hadoop, jar, $HADOOP_HOME/contrib/streaming/hadoop-streaming-1.2.1.jar,
16             -numReduceTasks, 40,
17             -input, inputpath1,
18             -input, inputpath2,
19             -output, outputpath,
20             -mapper, mapper,
21             -reducer, reducer,
22             -file, mapper,
23             -file, reducer,]
24     for f in os.listdir(mypath):
25         cmds.append(mypath + / + f)
26     cmd = [$HADOOP_HOME/bin/hadoop, fs, -rmr, outputpath]
27     subprocess.call(cmd)
28     subprocess.call(cmds)
29 
30 
31 def main():
32     mr_job()
33 
34 if __name__ == __main__:
35     main()

  schedule.py就是执行MapReduce的地方通过调用hadoop-streamingXXX.jar会通过调用shell命令来提交job,另外可以配置一下参数,shell命令会将制定的文件上传到hdfs然后分发到各个节点执行。。。$HADOOP_HOME就是hadoop的安装目录。。。mapper和reducer的python脚本的名字无所谓,方法名无所谓因为在配置shell执行命令时已经指定了

 

  上述是一个很简单的python_hadoop-streamingXXX例子。。。。

 

Hadoop Streaming例子(python)

标签:style   blog   ar   color   os   使用   sp   java   for   

原文地址:http://www.cnblogs.com/lxf20061900/p/4116379.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!