码迷,mamicode.com
首页 > 编程语言 > 详细

使用python+hadoop-streaming编写hadoop处理程序

时间:2016-08-18 19:51:14      阅读:361      评论:0      收藏:0      [点我收藏+]

标签:

Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力,来处理大数据

好吧我承认以上这句是抄的以下是原创干货

首先部署hadoop环境,这点可以参考 http://www.powerxing.com/install-hadoop-in-centos/

好吧原创从下一行开始

部署hadoop完成后,需要下载hadoop-streaming包,这个可以到http://www.java2s.com/Code/JarDownload/hadoop-streaming/hadoop-streaming-0.23.6.jar.zip去下载,或者访问http://www.java2s.com/Code/JarDownload/hadoop-streaming/选择最新版本,千万不要选择source否则后果自负,选择编译好的jar包即可,放到/usr/local/hadoop目录下备用

接下来是选择大数据统计的样本,我在阿里的天池大数据竞赛网站下载了母婴类购买统计数据,记录了900+个萌萌哒小baby的购买用户名、出生日期和性别信息,天池的地址https://tianchi.shuju.aliyun.com/datalab/index.htm

数据是一个csv文件,结构如下:

用户名,出生日期,性别(0女,1男,2不愿意透露性别)

比如:415971,20121111,0(数据已经脱敏处理)

下面我们来试着统计每年的男女婴人数

接下来开始写mapper程序mapper.py,由于hadoop-streaming是基于Unix Pipe的,数据会从标准输入sys.stdin输入,所以输入就写sys.stdin

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys

for line in sys.stdin:
    line = line.strip()
    data = line.split(,)
    if len(data)<3:
        continue
    user_id = data[0]
    birthyear = data[1][0:4]
    gender = data[2]
    print >>sys.stdout,"%s\t%s"%(birthyear,gender)

一个很简单的程序,看不懂的请自行提高姿势水平

下面是reduce程序,这里大家需要注意一下,map到reduce的期间,hadoop会自动给map出的key排序,所以到reduce中是一个已经排序的键值对,这简化了我们的编程工作

我是有洪荒之力的reducer.py,和外面的哪些妖艳贱货不一样

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys

gender_totle = {0:0,1:0}
prev_key = False
for line in sys.stdin:#map的时候map中的key会被排序
    line = line.strip()    
    data = line.split(\t)
    birthyear = data[0]
    curr_key = birthyear
    gender = data[1]
    
    #寻找边界,输出结果
    if prev_key and curr_key !=prev_key:#不是第一次,并且找到了边界
        print >>sys.stdout,"%s year has male %s and female %s"%(prev_key,gender_totle[0],gender_totle[1])#先输出上一次统计的结果
        prev_key = curr_key
        gender_totle[0] = 0
        gender_totle[1] = 0
        gender_totle[2] = 0#清零
        gender_totle[gender] +=1#开始计数
    else:
        prev_key = curr_key
        gender_totle[gender] += 1

接下来就是将样本和mapper reducer上传到hdfs中并执行了,这也是我踩坑的地方

首先要在hdfs中创建相应的目录,为了方便,我将一部分hadoop命令做了别名

alias stop-dfs=/usr/local/hadoop/sbin/stop-dfs.sh
alias start-dfs=/usr/local/hadoop/sbin/start-dfs.sh
alias dfs=/usr/local/hadoop/bin/hdfs dfs

启动hadoop后,先创建一个用户目录

dfs -mkdir -p /user/root

将样本上传到此目录中

dfs -put ./sample.csv /user/root

当然也可以这样处理更加规范,这两者的差别一会儿会说

dfs -mkdir -p /user/root/input
dfs -put ./sample.csv /user/root/input

接下来将mapper.py和reducer.py上传到服务器上,切换到上传以上两个文件的目录,建立一个input子目录,并将上述文件拷贝进去,我也不知道为什么一定要这样,总之照做就对了,等我想明白了会来更新的不要在意这种细节

然后就可以执行了,执行命令如下

hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar  -input sample.csv  -output output-streaming  -mapper mapper.py  -combiner reducer.py  -reducer reducer.py  -jobconf mapred.reduce.tasks=10  -file mapper.py  -file reducer.py

如果是将sample.csv放到input下,这个命令就英爱这么写,不过反正我也没试过,出错了不关我的事

hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar  -input input/sample.csv  -output output-streaming  -mapper mapper.py  -combiner reducer.py  -reducer reducer.py  -jobconf mapred.reduce.tasks=10  -file mapper.py  -file reducer.py

接下来就是激动人心的一刻了,要非常用力地跪着按下enter键

如果有报错output-streaming already exists就用命令dfs -rm -R /user/root/output-streaming 然后跳起来按下enter键

即使出现奇怪的刷屏也不要惊奇恩妈妈是这么教我的

如果出现以下字样就是成功了

16/08/18 18:35:20 INFO mapreduce.Job:  map 100% reduce 100%
16/08/18 18:35:20 INFO mapreduce.Job: Job job_local926114196_0001 completed successfully

 

使用python+hadoop-streaming编写hadoop处理程序

标签:

原文地址:http://www.cnblogs.com/wuxie1989/p/5785093.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!