码迷,mamicode.com
首页 > 其他好文 > 详细

深入讲解Hadoop管道

时间:2015-03-20 10:59:31      阅读:305      评论:0      收藏:0      [点我收藏+]

标签:hadoop   mapreduce   

Hadoop管道是Hadoop MapReduce的C++接口的代称。与流不同,流使用标准输入和输出让map和reduce节点之间相互交流,管道使用sockets作为tasktracker与C++编写的map或者reduce函数的进程之间的通道。JNI未被使用。

我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用管道来运行它。例2-12显示了用C++语言编写的map函数和reduce函数的源代码。

例2-12:用C++语言编写的最高气温程序

1.  #include <algorithm> 

2.  #include <limits> 

3.  #include <string> 

4.   

5.  #include "hadoop/Pipes.hh" 

6.  #include "hadoop/TemplateFactory.hh" 

7.  #include "hadoop/StringUtils.hh" 

8.   

9.  class MaxTemperatureMapper : public HadoopPipes::Mapper { 

10. public:  

11.   MaxTemperatureMapper(HadoopPipes::TaskContext& context) { 

12.   }  

13.   void map(HadoopPipes::MapContext& context) { 

14.     std::string line = context.getInputValue();  

15.     std::string year = line.substr(15, 4);  

16.     std::string airTemperature = line.substr(87, 5);  

17.     std::string q = line.substr(92, 1); 

18.     if (airTemperature != "+9999" && 

19.        (q == "0" || q == "1" || q == "4" || q == "5" || q == "9")) {  

20.       context.emit(year, airTemperature); 

21.     }  

22.   }  

23. };  

24.  

25.    

26. class MapTemperatureReducer : public HadoopPipes::Reducer { 

27. public:  

28.   MapTemperatureReducer(HadoopPipes::TaskContext& context) { 

29.   }  

30.   void reduce(HadoopPipes::ReduceContext& context) { 

31.     int maxValue = INT_MIN;  

32.     while (context.nextValue()) { 

33.       maxValue = std::max(maxValue,   

34.         HadoopUtils::toInt(context.getInputValue())); 

35.     }  

36.     context.emit(context.getInputKey(),  

37.       HadoopUtils::toString(maxValue)); 

38.   }  

39. };  

40.  

41. int main(int argc, char *argv[]) { 

42.   return HadoopPipes::runTask(HadoopPipes::TemplateFactory 

43.                         <MaxTemperatureMapper,
MapTemperatureReducer>());  

44. } 

此应用程序连接到Hadoop C++库,后者是一个用于与tasktracker子进程进行通信的轻量级的封装器。通过扩展在HadoopPipes命名空间的Mapper和Reducer类且提供map()和reduce()方法的实现,我们便可定义map和reduce函数。这些方法采用了一个上下文对象(MapContext类型或ReduceContext类型),后者提供读取输入和写入输出,通过JobConf类来访问作业配置信息。本例中的处理过程非常类似于Java的处理方式。

与Java接口不同,C++接口中的键和值是字节缓冲,表示为标准模板库(Standard Template Library,STL)的字符串。这使接口变得更简单,尽管它把更重的负担留给了应用程序的开发人员,因为开发人员必须将字符串convert to and from表示to和from两个逆操作。开发人员必须在字符串及其他类型之间进行转换。这一点在MapTemperatureReducer中十分明显,其中,我们必须把输入值转换为整数的输入值(使用HadoopUtils中的便利方法),在最大值被写出之前将其转换为字符串。在某些情况下,我们可以省略这个转化,如在MaxTemperatureMapper中,它的airTemperature值从来不用转换为整数,因为它在map()方法中从来不会被当作数字来处理。

main()方法是应用程序的入口点。它调用HadoopPipes::runTask,连接到从Mapper或Reducer连接到Java的父进程和marshals 数据。runTask()方法被传入一个Factory参数,使其可以创建Mapper或Reducer的实例。它创建的其中一个将受套接字连接中Java父进程控制。我们可以用重载模板factory方法来设置一个combiner(combiner)、partitioner(partitioner)、记录读取函数(record reader)或记录写入函数(record writer)。

编译运行

现在我们可以用Makerfile编译连接例2-13的程序。

例2-13:C++版本的MapReduce程序的Makefile

1.  CC = g++  

2.  CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include 

3.   

4.  max_temperature: max_temperature.cpp  

5.      $(CC) $(CPPFLAGS) $< -Wall   

6.      -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \ 

7.      -lhadooputils -lpthread -g -O2 -o $@ 

在Makefile中应当设置许多环境变量。除了HADOOP_INSTALL(如果遵循附录A的安装说明,应该已经设置好),还需要定义平台,指定操作系统、体系结构和数据模型(例如,32位或64位)。我在32位Linux系统的机器编译运行了如下内容:

1.  % export PLATFORM=Linux-i386-32 

2.  % make 

在成功完成之前,当前目录中便有max_temperature可执行文件。

要运行管道作业,我们需要在伪分布式(pseudo_distrinuted)模式下(其中所有守护进程运行在本地计算机上)运行Hadoop,其中的安装步骤参见附录A。管道不在独立模式(本地运行)中运行,因为它依赖于Hadoop的分布式缓存机制,仅在HDFS运行时才运行。

Hadoop守护进程开始运行后,第一步是把可执行文件复制到HDFS,以便它们启动map和reduce任务时,它能够被tasktracker取出:

1.  % hadoop fs -put max_temperature bin/max_temperature 

示例数据也需要从本地文件系统复制到HDFS:

1.  % hadoop fs -put input/ncdc/sample.txt sample.txt 

现在可以运行这个作业。为了使其运行,我们用Hadoop 管道命令,使用-program参数来传递在HDFS中可执行文件的URI。

1.  % hadoop pipes\  

2.    -D hadoop.pipes.java.recordreader\ 

3.    -D hadoop.pipes.java.recordwriter\ 

4.    inpit sample.txt\  

5.    output output  

6.    program bin/max_temperature 

我们使用-D选项来指定两个属性:hadoop.pipes.java.recordreader和hadoop.pipes.java.recordwriter,这两个属性都被设置为true,表示我们并没有指定一个C+++记录读取函数或记录写入函数,但我们要使用默认的Java设置(用来设置文本输入和输出)。管道还允许你设置一个Java mapper,reducer,combiner或partitioner。事实上,在任何一个作业中,都可以混合使用Java类或C++类。

结果和用其他语言编写的程序所得的结果一样。

深入讲解Hadoop管道

标签:hadoop   mapreduce   

原文地址:http://blog.csdn.net/crxy2016/article/details/44487761

(1)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!