标签:
引言 MapReduce作出保证:进入每个Reducer的数据行都是有序的(根据数据行的键值进行排序)。MapReduce将Mapper的输出进行排序并传递给Reducer作为输入的过程称为Shuffle。在很多场景下,Shuffle是整个MapReduce过程的核心,也是“奇迹”发生的地方,如下图所示: 理解Shuffle的执行过程对我们优化MapReduce任务带来帮助。这里以Hadoop 0.20.2代码为基础进行介绍,同时也会涉及到如何扩展MapReduce组件,从而影响Shuffer的行为。 Map Task Map Task产生输出的时候,并不是直接将数据写到本地磁盘,这个过程涉及到两个部分:写缓冲区、预排序。 (1)写缓冲区 每一个Map Task都拥有一个“环形缓冲区”作为Mapper输出的写缓冲区。写缓冲区大小默认为100MB(通过属性io.sort.mb调整),当写缓冲区的数据量达到一定的容量限额时(默认为80%,通过属性io.sort.spill.percent调整),后台线程开始将写缓冲区的数据溢写到本地磁盘。在数据溢写的过程中,只要写缓冲区没有被写满,Mappper依然可以将数据写出到缓冲区中;否则Mapper的执行过程将被阻塞,直到溢写结束。 溢写以循环的方式进行(即写缓冲区的数据量大致限额时就会执行溢写),可以通过属性mapred.local.dir指定写出的目录。 (2)预排序 溢写线程将数据最终写出到本地磁盘之前,首先根据Reducer的数目对这部分数据进行分区(即每一个分区中的数据会被传送至同一个Reducer进行处理,分区数目与Reducer数据保持一致),然后对每一个分区中的数据根据键值进行排序(预排序),如果MapReducer开启Combiner,则对该分区中排序后的数据执行Combine过程。Combine过程的执行“紧凑”了Mapper的输出结果,因此写入本地磁盘的数据量和传送给Reducer的数据量都会被减少,通常情况下能很大程序的提高MapReducer任务的效率。 每一次写缓冲达到溢写临界值时,都会形成一个新的溢写文件,因此当Map Task输出最后一个数据时,本地磁盘上会有多个溢写文件存在。在整个Map Task完成之前,这些溢写文件会被合并为一个分区且排序后的文件。合并可能分为多次,属性io.sort.factor控制一次最多合并多少个文件。 如果溢写文件个数超过3(通过属性min.num.spills.for.combine设置),会对合并且分区排序后的结果执行Combine过程(如果MapReduce有设置Combiner),而且combine过程在不影响最终结果的前提下可能会被执行多次;否则不会执行Combine过程(相对而言,Combine开销过大)。 注意:Map Task执行过程中,Combine可能出现在两个地方:写缓冲区溢写过程中、溢写文件合并过程中。 通过对Map Task的输出结果进行压缩是一个好主意,可以加快写入磁盘的速度、节省磁盘空间以及减少需要传递给Reducer的数据量。默认情况下,压缩是不被开启的,可以通过属性mapred.compress.map.output、mapred.map.output.compression.codec进行相应设置。 以上两步主要对应着上图中"partition, sort, and spill to disk",但代码与实际执行过程略有不同。在Hadoop 0.20.2版本中,写缓冲区由org.apache.hadoop.mapred.MapTask.MapOutputBuffer实现,写缓冲区代码如下: 1 2 3 4 5 6 @Override public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); } 可以看出,在将Mapper的一条输出结果(由key、value表示)写出到写缓冲区之前,已经提前计算好相应的分区信息,即分区的过程在数据写入写缓冲区之前就已经完成,溢写过程实际是写缓冲区数据排序的过程(先按分区排序,如果分区相同时,再按键值排序)。 这里涉及到MapReduce的两个组件:Comparator、Partitioner。 (1)Comparator Comparator不会影响对分区排序的过程,它影响的是对键值的排序过程,代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public int compare(int i, int j) { final int ii = kvoffsets[i % kvoffsets.length]; final int ij = kvoffsets[j % kvoffsets.length]; // sort by partition if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) { return kvindices[ii + PARTITION] - kvindices[ij + PARTITION]; } // sort by key return comparator.compare(kvbuffer, kvindices[ii + KEYSTART], kvindices[ii + VALSTART] - kvindices[ii + KEYSTART], kvbuffer, kvindices[ij + KEYSTART], kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]); } comparator的实例化是通过job.getOutputKeyComparator()完成的,代码如下: 复制代码 public RawComparator getOutputKeyComparator() { Class extends RawComparator> theClass = getClass( "mapred.output.key.comparator.class", null, RawComparator.class); if (theClass != null) { return ReflectionUtils.newInstance(theClass, this); } return WritableComparator.get(getMapOutputKeyClass().asSubclass( WritableComparable.class)); } 复制代码 由上述代码可以看出,comparator实例有两种提供方式,亦即我们可以自定义扩展组件Comparator的方式: a. 继承接口RawComparator,并通过属性mapred.output.key.comparator.class进行配置; b. 为Map Output Key(Map Output Key必须实现WritableComparable接口)设置相应的WritableComparator,并通过WritableComparator的静态方法define进行注册。 RawComparator接口代码如下: public interfacewww.76seo.com RawComparator标签:
原文地址:http://www.cnblogs.com/sagqawe/p/4179054.html