标签:
什么是Hadoop Streaming
? ?
Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapper和Reducer
? ?
一个例子(shell简洁版本)
? ?
$HADOOP_HOME/bin/hadoop jar
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar
-input myInputDirs
-output myOutputDir
-mapper cat
-reducer wc
? ?
解析:
? ?
首先找到Hadoop Streaming所在的包
然后定义输入输出路径
然后定义mapper和reducer
? ?
这里是用shell中的cat作为mapper,wc作为reducer
? ?
同一个例子(shell详细版),每行可能有多个单词
? ?
mapper.sh
? ?
#! /bin/bash
while read LINE; do
for word in $LINE
do
echo "$word 1"
done
done
? ?
reducer.sh
? ?
#! /bin/bash
count=0
started=0
word=""
while read LINE;do
newword=`echo $LINE | cut -d ‘ ‘ -f 1`
if [ "$word" != "$newword" ];then
[ $started -ne 0 ] && echo "$word\t$count"
word=$newword
count=1
started=1
else
count=$(( $count + 1 ))
fi
done
echo "$word\t$count"
? ?
同一个例子(python版)
? ?
map
? ?
#!/usr/bin/env python
?
import sys
?
# maps words to their counts
word2count = {}
?
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print ‘%s\t%s‘ % (word, 1)
? ?
reducer
? ?
#!/usr/bin/env python
?
from operator import itemgetter
import sys
?
# maps words to their counts
word2count = {}
?
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
?
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
?
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
?
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print ‘%s\t%s‘% (word, count)
? ?
测试:
? ?
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar
-input myInputDirs
-output myOutputDir
-mapper Mapper.py
-reducer Reducerr.py
-file Mapper.py
-file Reducer.py
? ?
或者本地测试
? ?
cat input.txt | python Mapper.py | sort | python Reducer.py
? ?
书中的例子(ruby版本)
? ?
Ruby版本
? ?
map函数:
? ?
#! /usr/bin/env ruby
STDIN.each_line do |line|
var = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end
? ?
? ?
? ?
reduce函数:
? ?
#! /usr/bin/env ruby
? ?
last_key, max_val = nil, 0
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key
? ?
reduce的输入是map的输出, 并且已经被Hadoop按照key排过序了.
? ?
所以, 如果 last_key 不为空, 可以代码还没有到最后; last_key!=key, 就是说这个key是一个新key. 打印之前key一条记录, 因为之前的key所有值都处理完了.
? ?
并且更新last_key, 最大值就是val.
? ?
如果是重复的key, 就和存储的最大值进行比较.
? ?
最后再打印最后一个K/V pair.
? ?
? ?
测试
? ?
? ?
书中的例子(python版本)
? ?
? ?
Hadoop Streaming编程原理
? ?
mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出,Streaming工具会创建MapReduce job,发送给各个taskTracker,同时监控整个job的执行过程
? ?
如果一个文件(可执行文件或者脚本)作为mapper,mapper初始化时,每一个mapper任务会把文件作为一个单独的进程启动
? ?
mapper任务运行时,把输入切分成行,然后把每一行提供给可执行文件进程的标准输入。同时mapper收集可执行文件进程的标准输出内容,并把收到的每一行内容转化为key/value对作为mapper的输出。
? ?
默认情况下,一行的第一个tab之前的作为key,后面的作为value
? ?
如果没有tab,整行作为key,value为空
? ?
用法
? ?
Hadoop jar + Hadoop Streaming jar + option
? ?
option有:
? ?
-input
-output
-mapper
-reducer
-file:打包文件到提交的作业中,可以使mapper或者reducer要用的输入文件,如配置文件,字典等
-partitioner
-combiner
-D:作业的一些属性,以前用的是-jobconf
? ?
mapred.map.tasks:map task的数目
mapred.reduce.tasks
stream.map.input.field.separator/stream.map.output.field.separator:map输入输出的分隔符,默认为\t
? ?
本地测试:
? ?
cat input.txt|python Mapper.py|sort|python Reducer.py
? ?
或者
? ?
cat input.txt|./Mapper|sort|./Reducer
? ?
摘自
? ?
http://dongxicheng.org/mapreduce/hadoop-streaming-programming/
《Hadoop权威指南》笔记 第二章 Hadoop Streaming
标签:
原文地址:http://www.cnblogs.com/keedor/p/4393717.html