标签:
首先是一些术语的说明。MapReduce作业(job)是客户端执行的单位:它包括输入数据、MapReduce程序和配置信息。Hadoop通过把作业分成若干个小任务(task)来工作,其包括两种类型的任务:map任务和reduce任务。
有两种类型的节点控制着作业执行过程:jobtracker和多个tasktracker。jobtracker通过调度任务在tasktracker上运行,来协调所有运行在系统上的作业。Tasktracker运行任务的同时,把进度报告传送到jobtracker,jobtracker则记录着每项任务的整体进展情况。如果其中一个任务失败,jobtracker可以重新调度任务到另外一个tasktracker。Hadoop把输入数据划分成等长的小数据发送到MapReduce,称为输入分片(input split)或分片。Hadoop为每个分片(split)创建一个map任务,由它来运行用户自定义的map函数来分析每个分片中的记录。
拥有许多分片就意味着处理每个分片的时间与处理整个输入的时间相比是比较小的。因此,如果我们并行处理每个分片,且分片是小块的数据,那么处理过程将有一个更好的负载平衡,因为更快的计算机将能够比一台速度较慢的机器在作业过程中处理完比例更多的数据分片。即使是相同的机器,没有处理的或其他同时运行的作业也会使负载平衡得以实现,并且在分片变得更细时,负载平衡质量也会更佳。
另一方面,如果分片太小,那么管理分片的总时间和map任务创建的总时间将决定作业的执行的总时间。对于大多数作业,一个理想的分片大小往往是一个HDFS块的大小,默认是64 MB,虽然这可以根据集群进行调整(对于所有新建文件)或在新建每个文件时具体进行指定。
map任务的执行节点和输入数据的存储节点是同一节点,Hadoop的性能达到最佳。这就是所谓的data locality optimization(数据局部性优化)。现在我们应该清楚为什么最佳分片的大小与块大小相同:它是最大的可保证存储在单个节点上的数据量。如果分区跨越两个块,那么对于任何一个HDFS节点而言,基本不可能同时存储这两数据块,因此此分布的某部分必须通过网络传输到节点,这与使用本地数据运行map任务相比,显然效率更低。
map任务把输出写入本地硬盘,而不是HDFS。这是为什么?因为map的输出作为中间输出:而中间输出则被reduce任务处理后产生最终的输出,一旦作业完成,map的输出就可以删除了。因此,把它及其副本存储在HDFS中,难免有些小题大做。如果该节点上运行的map任务在map输出给reduce任务处理之前崩溃,那么Hadoop将在另一个节点上重新运行map任务以再次创建map的输出。
reduce任务并不具备数据本地读取的优势-- 一个单一的reduce任务的输入往往来自于所有mapper的输出。在本例中,我们有一个单独的reduce任务,其输入是由所有map任务的输出组成的。因此,有序map的输出必须通过网络传输到reduce任务运行的节点,并在那里进行合并,然后传递到用户定义的reduce函数中。为增加其可靠性,reduce的输出通常存储在HDFS中。如第3章所述,对于每个reduce输出的HDFS块,第一个副本存储在本地节点上,其他副本存储在其他机架节点中。因此,编写reduce的输出确实十分占用网络带宽,但是只和正常的HDFS写管线的消耗一样。
一个单一的reduce 任务的整个数据流如图2-2所示。虚线框表示节点,虚线箭头表示数据传输到一个节点上,而实线的箭头表示节点之间的数据传输。
reduce任务的数目并不是由输入的大小来决定,而是单独具体指定的。在第7章的7.1节中,将介绍如何为一个给定的作业选择reduce任务数量。
如果有多个reducer,map任务会对其输出进行分区,为每个reduce任务创建一个分区(partition)。每个分区包含许多键(及其关联的值),但每个键的记录都在同一个分区中。分区可以通过用户定义的partitioner来控制,但通常是用默认的分区工具,它使用的是hash函数来形成"木桶"键/值,这种方法效率很高。
一般情况下,多个reduce任务的数据流如图2-3所示。此图清楚地表明了map和reduce任务之间的数据流为什么要称为"shuffle"(洗牌),因为每个reduce任务的输入都由许多map任务来提供。shuffle其实比此图所显示的更复杂,并且调整它可能会对作业的执行时间产生很大的影响,详见6.4节。
|
(点击查看大图)图2-2:MapReduce中单一reduce任务的数据流图 |
|
(点击查看大图)图2-3:多个reduce任务的MapReduce数据流 |
2.4.1 数据流(2)
最后,也有可能不存在reduce任务,不需要shuffle的时候,这样的情况是可能的,因为处理可以并行进行(第7章有几个例子讨论了这个问题)。在这种情况下,唯一的非本地节点数据传输是当map任务写入到HDFS中(见图2-4)。
|
图2-4:MapReduce中没有reduce任务的数据流 |
集群的可用带宽限制了MapReduce作业的数量,因此map和reduce任务之间数据传输的代价是最小的。Hadoop允许用户声明一个combiner,运行在map的输出上-- 该函数的输出作为reduce函数的输入。由于combiner是一个优化方法,所以Hadoop不保证对于某个map的输出记录是否调用该方法,调用该方法多少次。换言之,不调用该方法或者调用该方法多次,reducer的输出结果都一样。
combiner的规则限制着可用的函数类型。我们将用一个例子来巧妙地加以说明。以前面的最高气温例子为例,1950年的读数由两个map处理(因为它们在不同的分片中)。假设第一个map的输出如下:
1. (1950, 0)
2. (1950, 20)
3. (1950, 10)
第二个map的输出如下:
1. (1950, 25)
2. (1950, 15)
reduce函数再调用时被传入以下数字:
1. (1950, [0, 20, 10, 25, 15])
因为25是输入值中的最大数,所以输出如下:
1. (1950, 25)
我们可以用combiner,像reduce函数那样,为每个map输出找到最高气温。reduce函数被调用时将被传入如下数值:
1. (1950, [20, 25])
然而,reduce输出的结果和以前一样。更简单地说,我们可以像下面这样,对本例中的气温值进行如下函数调用:
1. max(0, 20, 10, 25, 15) = max(max(0, 20, 10),
max(25, 15)) = max(20, 25) = 25
并非所有函数都有此属性。例如,如果我们计算平均气温,便不能用mean作为combiner,因为:
1. mean(0, 20, 10, 25, 15) = 14
但是:
1. mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15
combiner并不能取代reduce函数。(为什么呢?reduce函数仍然需要处理来自不同的map给出的相同键记录。)但它可以帮助减少map和reduce之间的数据传输量,而正因为此,是否在MapReduce作业中使用combiner是需要慎重考虑的。
标签:
原文地址:http://my.oschina.net/crxy/blog/393579