码迷,mamicode.com
首页 > 其他好文 > 详细

《Hadoop权威指南》笔记 第二章 Hadoop Streaming

时间:2015-04-05 11:51:12      阅读:146      评论:0      收藏:0      [点我收藏+]

标签:

什么是Hadoop Streaming

? ?

Hadoop提供的一个编程工具,允许用户使用任何可执行文件或脚本作为mapperReducer

? ?

一个例子(shell简洁版本)

? ?

$HADOOP_HOME/bin/hadoop jar

$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar

-input myInputDirs

-output myOutputDir

-mapper cat

-reducer wc

? ?

解析:

? ?

首先找到Hadoop Streaming所在的包

然后定义输入输出路径

然后定义mapperreducer

? ?

这里是用shell中的cat作为mapperwc作为reducer

? ?

同一个例子(shell详细版),每行可能有多个单词

? ?

mapper.sh

? ?

#! /bin/bash

while read LINE; do

for word in $LINE

do

echo "$word 1"

done

done

? ?

reducer.sh

? ?

#! /bin/bash

count=0

started=0

word=""

while read LINE;do

newword=`echo $LINE | cut -d ‘ ‘ -f 1`

if [ "$word" != "$newword" ];then

[ $started -ne 0 ] && echo "$word\t$count"

word=$newword

count=1

started=1

else

count=$(( $count + 1 ))

fi

done

echo "$word\t$count"

? ?

同一个例子(python)

? ?

map

? ?

#!/usr/bin/env python

?

import sys

?

# maps words to their counts

word2count = {}

?

# input comes from STDIN (standard input)

for line in sys.stdin:

# remove leading and trailing whitespace

line = line.strip()

# split the line into words while removing any empty strings

words = filter(lambda word: word, line.split())

# increase counters

for word in words:

# write the results to STDOUT (standard output);

# what we output here will be the input for the

# Reduce step, i.e. the input for reducer.py

#

# tab-delimited; the trivial word count is 1

print ‘%s\t%s‘ % (word, 1)

? ?

reducer

? ?

#!/usr/bin/env python

?

from operator import itemgetter

import sys

?

# maps words to their counts

word2count = {}

?

# input comes from STDIN

for line in sys.stdin:

# remove leading and trailing whitespace

line = line.strip()

?

# parse the input we got from mapper.py

word, count = line.split()

# convert count (currently a string) to int

try:

count = int(count)

word2count[word] = word2count.get(word, 0) + count

except ValueError:

# count was not a number, so silently

# ignore/discard this line

pass

?

# sort the words lexigraphically;

#

# this step is NOT required, we just do it so that our

# final output will look more like the official Hadoop

# word count examples

sorted_word2count = sorted(word2count.items(), key=itemgetter(0))

?

# write the results to STDOUT (standard output)

for word, count in sorted_word2count:

print ‘%s\t%s‘% (word, count)

? ?

测试:

? ?

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar

-input myInputDirs

-output myOutputDir

-mapper Mapper.py

-reducer Reducerr.py

-file Mapper.py

-file Reducer.py

? ?

或者本地测试

? ?

cat input.txt | python Mapper.py | sort | python Reducer.py

? ?

书中的例子(ruby版本)

? ?

Ruby版本

? ?

map函数:

? ?

#! /usr/bin/env ruby

STDIN.each_line do |line|

var = line

year, temp, q = val[15,4], val[87,5], val[92,1]

puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)

end

? ?

技术分享

? ?

技术分享

? ?

reduce函数:

? ?

#! /usr/bin/env ruby

? ?

last_key, max_val = nil, 0

STDIN.each_line do |line|

key, val = line.split("\t")

if last_key && last_key != key

puts "#{last_key}\t#{max_val}"

last_key, max_val = key, val.to_i

else

last_key, max_val = key, [max_val, val.to_i].max

end

end

puts "#{last_key}\t#{max_val}" if last_key

? ?

reduce的输入是map的输出, 并且已经被Hadoop按照key排过序了.

? ?

所以, 如果 last_key 不为空, 可以代码还没有到最后; last_key!=key, 就是说这个key是一个新key. 打印之前key一条记录, 因为之前的key所有值都处理完了.

? ?

并且更新last_key, 最大值就是val.

? ?

如果是重复的key, 就和存储的最大值进行比较.

? ?

最后再打印最后一个K/V pair.

? ?

技术分享

? ?

测试

? ?

技术分享

? ?

书中的例子(python版本)

? ?

技术分享

? ?

技术分享

Hadoop Streaming编程原理

? ?

mapperreducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出,Streaming工具会创建MapReduce job,发送给各个taskTracker,同时监控整个job的执行过程

? ?

如果一个文件(可执行文件或者脚本)作为mappermapper初始化时,每一个mapper任务会把文件作为一个单独的进程启动

? ?

mapper任务运行时,把输入切分成行,然后把每一行提供给可执行文件进程的标准输入。同时mapper收集可执行文件进程的标准输出内容,并把收到的每一行内容转化为key/value对作为mapper的输出。

? ?

默认情况下,一行的第一个tab之前的作为key,后面的作为value

? ?

如果没有tab,整行作为keyvalue为空

? ?

用法

? ?

Hadoop jar + Hadoop Streaming jar + option

? ?

option有:

? ?

-input

-output

-mapper

-reducer

-file:打包文件到提交的作业中,可以使mapper或者reducer要用的输入文件,如配置文件,字典等

-partitioner

-combiner

-D:作业的一些属性,以前用的是-jobconf

? ?

mapred.map.tasksmap task的数目

mapred.reduce.tasks

stream.map.input.field.separator/stream.map.output.field.separatormap输入输出的分隔符,默认为\t

? ?

本地测试:

? ?

cat input.txt|python Mapper.py|sort|python Reducer.py

? ?

或者

? ?

cat input.txt|./Mapper|sort|./Reducer

? ?

摘自

? ?

http://dongxicheng.org/mapreduce/hadoop-streaming-programming/

《Hadoop权威指南》笔记 第二章 Hadoop Streaming

标签:

原文地址:http://www.cnblogs.com/keedor/p/4393717.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!