码迷,mamicode.com
首页 > 其他好文 > 详细

mapreduce任务中client生成split的源码过程

时间:2020-12-21 11:24:14      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:BMI   sre   移动   win   ted   mode   tfs   make   util   

 

 

1、提交任务

//提交任务直到返回结果
job.waitForCompletion(true);

2、点击waitForCompletion方法到Job.java类

技术图片

技术图片

3、跟进到submit方法,调用方法:

return submitter.submitJobInternal(Job.this, cluster);

技术图片

技术图片

4、跟进submitJobInternal方法,跳转到JobSubmitter.java的submitJobInternal方法,经过读取配置和设置、上传信息到HDFS,开始写分片信息:

// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
技术图片

技术图片

5、跟进到writeSplits方法,2.X会走writeNewSplits分支。

技术图片

技术图片

6、跟进到writeNewSplits方法,可以看到会通过反射创建一个输入类型实例。

InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
技术图片

技术图片

7、跟进到getInputFormatClass方法,会跳转到JobContext.java,是一个接口,不会实现getInputFormatClass方法。

/**
* Get the {@link InputFormat} class for the job.

* @return the {@link InputFormat} class for the job.
*/
public Class<? extends InputFormat<?,?>> getInputFormatClass() 
  throws ClassNotFoundException;

点击其实现类,JobContextImpl.java

技术图片

技术图片

public Class<? extends InputFormat<?,?>> getInputFormatClass() 
  throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>) 
  conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}

可以看到conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);如果不设置输入类型,hadoop默认为TextInputFormat

其继承关系如下:

技术图片

技术图片

到此,获取到了InputFormatClass。

8、在获取输入类型的,回到writeNewSplits方法中继续执行,如下代码开始获取分片:

List<InputSplit> splits = input.getSplits(job);

技术图片

技术图片

9、跟进到input.getSplits方法,跳转到InputFormat.java抽象类里,也没有实现这个方法。

查找其实现类,发现在FileInputFormat子类里有实现。

技术图片

技术图片

10、跟进到FileInputFormat子类里的实现,首先获取split的minSize和maxSize

 

long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

跟进到getFormatMinSplitSize,代码如下,直接返回1

技术图片

技术图片

跟进到getMinSplitSize(job)方法,代码如下:

public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}

参数SPLIT_MINSIZE,如果没有设置,也取1L

public static final String SPLIT_MINSIZE = 
"mapreduce.input.fileinputformat.split.minsize";

所以总得来说splitSize默认最小值为1。

再跟进到getMaxSplitSize方法,代码如下:

public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                          Long.MAX_VALUE);
}

如果没有设置参数SPLIT_MAXSIZE,splitSize默认最大值为Long.MAX_VALUE。

11、继续往下执行,针对每一个输入的文件进行遍历:

技术图片

技术图片

 

12、每次遍历文件,很重要的方法就是计算split大小,即computeSplitSize方法,跟进到这个方法的实现。

protected long computeSplitSize(long blockSize, long minSize,
                              long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}

通过这个逻辑,可以得出以下调整split大小的方法,如果要将split大小调整的比blockSize小,则调整maxSize大小比较blockSize值小即可。如果要将split大小调整到比blockSize大,则调整minSize的大小比blockSize值大即可。

 

13、继续执行,以下代码实现将生成分片与文件块的位置对应关系,为后续计算向数据移动,提供必要的信息。

 

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
            blkLocations[blkIndex].getHosts(),
            blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

path:文件路径

length-bytesRemaining:split起始的offset

splitSize:split的大小,可以获取结束的offset blkLocations[blkIndex].getHosts():分片分布的主机。 blkLocations[blkIndex].getCachedHosts()):缓存

 

到此,client计算出split分配信息,为下一步map运算,计算向数据移动提供了需要的信息,至于如何移动,还要结合资源的情况。

mapreduce任务中client生成split的源码过程

标签:BMI   sre   移动   win   ted   mode   tfs   make   util   

原文地址:https://www.cnblogs.com/windtalker/p/14141560.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!