标签:调度 调用 实现类 XML math debug 编程 阶段 lan
一、MapReduce基本概述是一个分布式运算程序编程框架。核心功能是将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式程序,并发运行在一个hadoop集群上。
(1)优点
1>易于编程:以普通程序的编程方法加上使用MapReduce提供的接口,可以快速完成分布式程序的编写。
2>良好的扩展性:计算资源得不到满足时,可以通过简单的增加计算机器来扩展计算能力
3>高容错性:如果一个任务所在计算节点挂了,上面的计算任务可以自动转移到另外的节点上执行,即故障自动转移,这个过程是内部完成的,无需人工干预
4>适合PB级别以上数据的离线处理
(2)缺点
1>实时计算:无法像mysql一样在毫秒级或者秒级返回计算结果
2>流式计算:流式计算的输入数据是动态的,而MapReduce要求输入数据是静态的,已经持久化在存储上的。
3>DAG(有向无环图)计算:多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出,这种情况下,MapReduce的性能很低。因为MapReduce的每个阶段的输出结果都会先写入到磁盘中,大量的磁盘IO会造成性能的急剧下降。
核心思想就是分为map和reduce两个阶段。
1)首先将输出的数据进行切片处理,然后各个切片数据分给独立的一个map task任务。map内部根据业务逻辑对数据进行统计处理。每个map task之间互不影响。
2)接着就是将所有map task 的输出作为 reduce task的输入(reduce task的数量与分区有关,后面细讲),将各个map task的局部统计汇总成全局统计,最终完成结果输出
3)MapReduce编程模型中只能由一个map和reduce阶段,多个MapReduce程序只能串行运行,无法并行运行
基本概述:
1)当我们编写完MR作业后,需要通过JobClient来提交一个job,提交的信息会发送到JobTracker模块,这个模块是第一代MapReduce计算框架的核心之一,它负责与集群中的其他节点维持心跳,为提交的作业分配资源,管理提交的作业的正常运作(失败,重启等)。
2)第一代MapReduce的另一个核心的功能是TaskTracker,在各个TaskTracker安装节点上,它的主要功能是监控自己所在节点的资源使用情况。
3)TaskTracker监控当前节点的Tasks的运行情况,其中包含Map Task和Reduce Task,最后由Reduce Task到Reduce阶段,将结果输送到HDFS的文件系统中;其中的具体流程如图中描述的1-7步骤。TaskTracker在监控期间,需要把这些信息通过心跳机制发送给JobTracker,JobTracker收集到这些信息后,给新提交的作业分配其他的资源,避免重复资源分配。
缺点:
1)JobTracker是第一代MapReduce的入口点,若是JobTracker服务宕机,整个服务将会瘫痪,存在单点问题。
2)JobTracker负责的事情太多,完成来太多的任务,占用过多的资源,当Job数非常多的时候,会消耗很多内存,容易出现性能瓶颈。
3)对TaskTracker而言,Task担当的角色过于简单,没有考虑到CPU及内存的使用情况,若存在多个大内存的Task被集中调度,容易出现内存溢出。
4)另外,TaskTracker把资源强制分为map task slot和reduce task slot,若是MR任务中只存在其中一个(map或是reduce),会出现资源浪费的情况,资源利用率低。也就是说资源是静态分配的
V2比起V1最大的不同就是增加了 yarn 这个组件。
? 架构重构的基本思想在于将JobTracker的两个核心的功能单独分离成独立的组件了。分离后的组件分别为资源管理(Applications Manager)和任务调度器(Resource Scheduler)。新的资源管理器(Resource Manager)管理整个系统的资源分配,而每一个Node Manager下的App Master(Application Master)负责对应的调度和协调工作(每个MapReduce任务都有一个对应的app master),而在实际中,App Master从Resource Manager上获得资源,让Node Manager来协同工作和任务监控。
? 对比于MR V1中的Task的监控,重启等内热都交由App Master来处理,Resource Manager提供中心服务,负责资源的分配与调度。Node Manager负责维护Container的状态,并将收集的信息上报给Resource Manager,以及负责和Resource Manager维持心跳。
优点:
1)减少资源消耗,让监控每一个作业更加分布式了。
2)加入了yarn之后,支持更多的编程模型,比如spark等
3)将资源以内存量的概念来描述,比V1中的slot更加合理,而且资源都是动态分配
4)资源的调度和分配更加有层次化,RM负责总的资源管理和调度,每个节点上的appMaster负责当前节点的资源管理和调度
其中上面从第7步到16步称为shuffle机制,
1)maptask收集我们的map()方法输出的kv对,放到内存缓冲区中
2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3)多个溢出文件会被合并成大的溢出文件
4)在溢出过程中,及合并的过程中,都要调用partitioner进行分区和针对key进行排序
5)reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
6)reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
7)合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
由MapReduce的工作流程可以知道,maptask的数量决定于切片的数量,所以我们看看切片的原理。
在MapReduce的工作流程中,在对数据进行map运算前,会先对数据进行切片处理,然后每一片交给一个独立的map task进行处理。那么map task是如何获取到切片实现类的呢?
首先 MapTask是以 run 方法为入口开始map任务的。
/*
MapTask.java
*/
public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException {
//此处省略好多代码,直接看这个方法,其实就是新旧api的兼容
this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter);
} else {
this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter);
}
this.done(umbilical, reporter);
}
}
//下面是 runNewMapper 方法
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper(JobConf job, TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException {
................
//这里就看到获取 inputFormat 的实现类,关键就在于 taskContext这对象,它的类是 TaskAttemptContextImpl
InputFormat<INKEY, INVALUE> inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
}
/*
TaskAttemptContextImpl.java 继承 JobContextImpl类
JobContextImpl 实现了 JobContext 接口,该接口定义很多set和get方法,用于配置job上下文对象的
*/
public class JobContextImpl implements JobContext {
public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException {
//可以看到这里就是从conf对象中获取inputformat.class,默认值就是TextInputFormat
return this.conf.getClass("mapreduce.job.inputformat.class", TextInputFormat.class);
}
}
由此我们可以看到,默认处理输入数据的类是 TextInputFormat,但是这个类并没有实现切片方法,在它的父类 FileInputFormat中实现了切片方法:
/*
FileInputFormat.java
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = (new StopWatch()).start();
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
//这个就是存储切片信息的数组
List<InputSplit> splits = new ArrayList();
//获取输入路径的所有文件
List<FileStatus> files = this.listStatus(job);
Iterator i$ = files.iterator();
while(true) {
while(true) {
while(i$.hasNext()) {
FileStatus file = (FileStatus)i$.next();
Path path = file.getPath();
long length = file.getLen();
if (length != 0L) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
//获取文件块信息
blkLocations = ((LocatedFileStatus)file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0L, length);
}
//从这里开始就正式切片
if (this.isSplitable(job, path)) {
long blockSize = file.getBlockSize();
//获取切片大小
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining;
int blkIndex;
//循环对文件进行切片,可以看到这里是判断文件剩余部分是否大于1.1倍的切片大小的
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
//将文件,切片起始和终止位置,切片大小,切片的block所在主机等记录到切片数组中作为切片信息。
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
//这里是将文件最后的内容作为最后一个切片添加到切片规划中
if (bytesRemaining != 0L) {
blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
} else {
splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
}
} else {
splits.add(this.makeSplit(path, 0L, length, new String[0]));
}
}
job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
}
}
/*
这个方法是决定切片大小的,简单说主要决定于 maxsize和blocksize 的大小,
maxsize > blockSize, 则 splitsize = blockSize
maxsize < blockSize, 则 splitsize = maxsize
minSize>blockSize,则 splitsize = minSize
minSize<blockSize,则 splitsize = blockSize
当然要注意的是,maxsize需要是永远大于minSize的
*/
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
上面的切片值是规划而已,并没有真正的切片,而是当job提交给yarn执行的之后,才会真正按照切片规划进行数据读取。上述切片的特点总结如下:
1)按照文件的内容的长度进行切片
2)切片是按照每个文件独立进行切片,并不会将所有文件当做一个整体去切片,这样有缺点(后面讲)
3)切片大小:默认为blocksize,计算机制如上,这里不重复
FileInputFormat.setMaxInputSplitSize(); maxsize
FileInputFormat.setMinIutputSplitSize(); minsize
可以通过设置两个值来改变切片大小
4)切片的方式:根据源码,每次切片时,都会判断切完剩下的部分是否大于splitSize的1.1倍,如果不大于,那么此时切片就终止,并将剩下的部分作为最后一个切片。
我们从(2)中可以知道,TextInputFormat(FileInputFormat)切片时是按照文件进行切片的,也就是说一个文件至少是一个切片,无论文件的大小是多大。而如果有大量的小文件,那么就会生成很多个maptask,处理效率很低。对于这种情况,解决方案为:
1)从数据源头上解决,将数据合并后再上传至HDFS,不产生大量小文件
2)如果必须处理大量小文件,那么就采用CombineTextInputFormat来进行切片。
切片逻辑如下(源码挺长的,下面直接说我研究源码后的结果):
首先CombineTextInputFormat没有实现 getSplit() 方法,而是由它的父类 CombineFileInputformat实现的,它会将一个目录下的多个文件作为一个整体的数据源进行切片,切片的大小取决于 MaxSplitSize 设定的最大切片大小大小,单位是byte。切片逻辑为
totalSize<=1.5*MaxSplitSize 1片, splitSize=totalSize
1.5*MaxSplitSize<totalsize<2*MaxSplitSize 2片,splitSize=MaxSplitSize
totalsize>2*MaxSplitSize n片,splitSize=MaxSplitSize
要注意的是:
如果总的数据大小远大于MaxSplitSize时,切到最后一片的时候,会判断切片后,剩下的部分是否大于2倍MaxSplitSize,如果不大于,就算作一片,如果大于就两片
使用 CombineTextInputFormat 作为InpuFormat的操作类:
//设置 InputFormat的类为CombineTextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//分别设置切片最大值和最小值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
前面说到,map task的数量决定于切片的数量,那么reduce task的数量决定于什么呢?决定于分区的数量。
1)首先需要自定义一个分区类,并继承 Partitioner<key,value>
2)重写public Int getPartition() 方法。返回的是分区号
3)在job中设置自定义的类为分区类,否则默认的分区类就是HashPartitioner
job.setPartitionerClass(CustomPartitioner.class);
4)设置reduce task数量,一般和分区数相同 ,
job.setNumReduceTasks(N);
注意:分区数和reduce task数的联系
如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);
//默认分区号,如果都不符合下面条件,则KV划分到这个分区
int partition = 4;
// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
标签:调度 调用 实现类 XML math debug 编程 阶段 lan
原文地址:https://blog.51cto.com/kinglab/2443452