码迷,mamicode.com
首页 > 其他好文 > 详细

mapreducer计算原理

时间:2016-08-19 18:50:32      阅读:294      评论:0      收藏:0      [点我收藏+]

标签:

mapreducer计算原理

 技术分享

InputFormat

InputFormat的默认实现是TextInputFormat

InputSplit

概念

mapreducer对文件进行处理和运算的输入单位。只是一个逻辑概念。每一个InputSplit没有对文件进行实际的切割。只是记录了要处理文件的位置信息(包括文件的pathhosts、长度(length))。在默认情况下,InputSplitBlock数目是一样的。

getLength

  得到一个InputSplit的长度

getLocations

得到该InputSplit文件的具体的位置,包括复制集的位置

FileSplit

一种split的实现

属性Path file

代表的是文件的路径,这个大的文件的存储路径

属性long start

分片在文件中的起始位置

属性long length

分片的长度

属性String[] hosts

存储分片所在的主机

属性SplitLocationInfo[] hostInfos

存储分片所在的主机的信息

说明

用这四个参数就可以计算出提供给每一个map分片的数据。

RecordReader

为数据读取器接口

next

   boolean next(K key, V value) throws IOException;

InputSplit读取数据,如果返回值为true则keyvalue已经被读取了,如果返回值为false则为最后的数据了

  

createKey

  createKey();

  按照子类给定的规则创建key

createValue

  按照子类给定的规则创建value

 

  V createValue();

 

getPos

返回当前遍历的InputSplit位置

  /**

   * Returns the current position in the input.

   *

   * @return the current position in the input.

   * @throws IOException

   */

  long getPos() throws IOException;

close

关闭当前遍历的InputSplit

  /**

   * Close this {@link InputSplit} to future operations.

   *

   * @throws IOException

   */

  public void close() throws IOException;

getProgress

  /**

   * How much of the input has the {@link RecordReader} consumed i.e.

   * has been processed by?

   *

   * @return progress from <code>0.0</code> to <code>1.0</code>.

   * @throws IOException

   */

  float getProgress() throws IOException;

LineRecordReader
createKey

return new LongWritable();

从该方法可以得知,默认的key是一个偏移量。

createValue

public Text createValue() {

    return new Text();

  }

从这里可以看出value是一个Text,在这里是一行的内容。

nextKeyValue

if (key == null) {

            key = new LongWritable();

        }

        key.set(pos);// key即为偏移量

读取跨InputSplit数据的问题

如果一行数据被分配到了两个InputSplit,怎么样能确保读取到了完整的行呢?在这里用了两个方法搞定。

 技术分享

这里getFilePosition()<=end,而不是getFilePosition()<end说明读到最后一行,还要继续读下去。也就是说还要读下一个InputSplit第一行。

 技术分享

 

start第一个InputSplit如果是第一个InputSplit则从第一个Record读取。如果不是第一个InputSplit则抛弃第一个Record直接读取第二个。

 

这两种机制就可以完全解决数据跨两个InputSplit读取问题。

Map

 技术分享

 

Partition

Partition按照某一个特定的值划分区域,每一个区域将来会送给特定的reducer所以分了多少个区域应该就会产生多少个reducer。这样一来程序的性能会进一步提升。

Sort

map task 开始运算,并产生中间数据时,其产生的中间结果并非直接就简单的写入磁盘。这中间的过程比较复杂,并且利用到了内存buffer 来进行已经产生的部分结果的缓存,并在内存buffer 中进行一些预排序来优化整个map 的性能。如上图所示,每一个map 都会对应存在一个内存buffer MapOutputBuffer ,即上图的buffer in memory ),map 会将已经产生的部分结果先写入到该buffer 中,这个buffer 默认是100MB 大小,但是这个大小是可以根据job 提交时的参数设定来调整的,该参数即为: io.sort.mb 。当map 的产生数据非常大时,并且把io.sort.mb 调大,那么map 在整个计算过程中spill 的次数就势必会降低,map task 对磁盘的操作就会变少,如果map tasks 的瓶颈在磁盘上,这样调整就会大大提高map 的计算性能

Combiner

map的reducer过程。当job 指定了combiner 的时候,我们都知道map 介绍后会在map 端根据combiner 定义的函数将map 结果进行合并。运行combiner 函数的时机有可能会是merge 完成之前,或者之后,这个时机可以由一个参数控制,即 min.num.spill.for.combine default 3 ),当job 中设定了combiner ,并且spill 数最少有3 个的时候,那么combiner 函数就会在merge 产生结果文件之前运行。通过这样的方式,就可以在spill 非常多需要merge ,并且很多数据需要做conbine 的时候,减少写入到磁盘文件的数据数量,同样是为了减少对磁盘的读写频率,有可能达到优化作业的目的。

Spill

map 在运行过程中,不停的向该buffer 中写入已有的计算结果,但是该buffer 并不一定能将全部的map 输出缓存下来,当map 输出超出一定阈值(比如100M ),那么map 就必须将该buffer 中的数据写入到磁盘中去,这个过程在mapreduce 中叫做spill map 并不是要等到将该buffer 全部写满时才进行spill ,因为如果全部写满了再去写spill ,势必会造成map 的计算部分等待buffer 释放空间的情况。所以,map 其实是当buffer 被写满到一定程度(比如80% )时,就开始进行spill 。这个阈值也是由一个job 的配置参数来控制,即 io.sort.spill.percent ,默认为0.80 80% 。这个参数同样也是影响spill 频繁程度,进而影响map task 运行周期对磁盘的读写频率的。但非特殊情况下,通常不需要人为的调整。调整io.sort.mb 对用户来说更加方便。

merage

map task 的计算部分全部完成后,如果map 有输出,就会生成一个或者多个spill 文件,这些文件就是map 的输出结果。map 在正常退出之前,需要将这些spill 合并(merge )成一个,所以map 在结束之前还有一个merge 的过程。merge 的过程中,有一个参数可以调整这个过程的行为,该参数为: io.sort.factor 。该参数默认为10 。它表示当merge spill 文件时,最多能有多少并行的stream merge 文件中写入。比如如果map 产生的数据非常的大,产生的spill 文件大于10 ,而io.sort.factor 使用的是默认的10 ,那么当map 计算完成做merge 时,就没有办法一次将所有的spill 文件merge 成一个,而是会分多次,每次最多10 stream 。这也就是说,当map 的中间结果非常大,调大io.sort.factor ,有利于减少merge 次数,进而减少map 对磁盘的读写频率,有可能达到优化作业的目的。

Reducer

OutputFormat

outputFormat的默认实现是TextOutputFormat

输出到纯文本文件,格式为:key+” ”+value

TextOutputFormat

属性newline

//的行的产生

private static final byte[] newline    换行符

//用一个换行符产生了新的一行

static {

      try {

        newline = "\n".getBytes(utf8);

      } catch (UnsupportedEncodingException uee) {

        throw new IllegalArgumentException("can‘t find " + utf8 + " encoding");

      }

}

Write方法

public synchronized void write(K key, V value)

      throws IOException {

 

      boolean nullKey = key == null || key instanceof NullWritable;

      boolean nullValue = value == null || value instanceof NullWritable;

      //下面的代码都是为了拼接一行数据

      if (nullKey && nullValue) {

        return;

      }

      if (!nullKey) {

        writeObject(key);    一行中的第一个组成部分:key

      }

      if (!(nullKey || nullValue)) {

        out.write(keyValueSeparator); 一行中的第二个组成部分:tab

      }

      if (!nullValue) {

        writeObject(value);   一行中的第三个组成部分:value

      }

      out.write(newline);     一行中的第四个组成部分:\n   换行符

    }

mapreducer计算原理

标签:

原文地址:http://www.cnblogs.com/zpb2016/p/5788443.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!