标签:源码 record int .com 理解 不同的 规划 images 申请
前言
上一篇我们分析了一个MapReduce在执行中的一些细节问题,这一篇分享的是MapReduce并行处理的基本过程和原理。
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。
Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
首先要说明的是Hadoop2.0之前和Hadoop2.0之后的区别:
2.0之前只有MapReduce的运行框架,那么它里面有只有两种节点,一个是master,一个是worker。master既做资源调度又做程序调度,worker只是用来参与计算的。
但是在2.0之后加入了YARN集群,Yarn集群的主节点承担了资源调度,Yarn集群的从节点中会选出一个节点(这个由redourcemanager决定)
用作类似于2.0之前的master的工作,来进行应用程序的调度。
资源调度: 处理程序所需要的cpu、内存资源,以及存储数据所需要的硬盘资源都是resourcemanager去分配的。
一切都是从最上方的user program开始的,user program链接了MapReduce库,实现了最基本的Map函数和Reduce函数。
图中执行的顺序都用数字标记了。
1)MapReduce库先把user program的输入文件划分为M份(M为用户定义),如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内其它机器上。
2)user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业或者Reduce作业),worker的数量也是
可以由用户指定的。
3)被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业从输入数据中抽取出键值对,每一个键值对
都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。
4)缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报
给master,master负责将信息转发给Reduce worker。
5)master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当
Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是
同一个Reduce作业(谁让分区少呢),所以排序是必须的。
6)reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。
7)当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码。
8)所有执行完毕后,MapReduce输出放在了R个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这R个文件,而是将其作为输入交给另一
个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件
系统(GFFS)的。而且我们要注意Map/Reduce作业和map/reduce函数的区别:Map作业处理一个输入数据的分片,可能需要调用多次map函数来处理每个输入
键值对;Reduce作业处理一个分区的中间键值对,期间要对每个不同的键调用一次reduce函数,Reduce作业最终也对应一个输出文件。
Map/Reduce框架运转在<key, value>键值对上,也就是说,框架把作业的输入看为是一组<key, value>键值对,同样也产出一组 <key, value>键值对做为作业的输出,这两组键
值对的类型可能不同。
框架需要对key和value的类(classes)进行序列化操作,因此,这些类需要实现Writable接口。另外,为了方便框架执行排序操作,key类必须实现 WritableComparable接口。
注意:不管是哪里的序列化,最主要的作用就是持久化存储或者是用于网络传输
一个Map/Reduce作业的输入和输出类型如下所示:
(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)。
其实在前面讲解Hadoop IO的时候已经知道了解了Writale接口:
Writable接口是一个实现了序列化协议的序列化对象。
在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。
mapreduce 其实是分治算法的一种现,所谓分治算法就是“就是分而治之 ,将大的问题分解为相同类型的子问题(最好具有相同的规模),对子问题进行求解,然后合并成大问题的解。
mapreduce就是分治法的一种,将输入进行分片,然后交给不同的task进行处理,然后合并成最终的解。
mapreduce实际的处理过程可以理解为Input->Map->Sort->Combine->Partition->Reduce->Output。
1)Input阶段
数据以一定的格式传递给Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以设置,也可以自定义分片函数。
2)map阶段
对输入的(key,value)进行处理,即map(k1,v1)->list(k2,v2),使用Job.setMapperClass进行设置。
3)Sort阶段
对于Mapper的输出进行排序,使用Job.setOutputKeyComparatorClass进行设置,然后定义排序规则。
4)Combine阶段
这个阶段对于Sort之后又相同key的结果进行合并,使用Job.setCombinerClass进行设置,也可以自定义Combine Class类。
5)Partition阶段
将Mapper的中间结果按照key的范围划分为R份(Reduce作业的个数),默认使用HashPartioner(key.hashCode()&Integer.MAX_VALUE%numPartitions),也可以自定义划分的函数。
使用Job.setPartitionClass设置。
6)Reduce阶段
对于Mapper阶段的结果进行进一步处理,Job.setReducerClass进行设置自定义的Reduce类。
7)Output阶段
Reducer输出数据的格式。
一个mapreduce作业的执行流程是:作业提交->作业初始化->任务分配->任务执行->更新任务执行进度和状态->作业完成。
一个完整的mapreduce作业流程,包括4个独立的实体:
客户端:client,编写mapreduce程序,配置作业,提交作业。
JobTracker:协调这个作业的运行,分配作业,初始化作业,与TaskTracker进行通信。
TaskTracker:负责运行作业,保持与JobTracker进行通信。
HDFS:分布式文件系统,保持作业的数据和结果。
JobClient使用runjob方法创建一个JobClient实例,然后调用submitJob()方法进行作业的提交,提交作业的具体过程如下:
1)通过调用JobTracker对象的getNewJobId()方法从JobTracker处获得一个作业ID。
2)检查作业的相关路径。如果输出路径存在,作业将不会被提交(保护上一个作业运行结果)。
3)计算作业的输入分片,如果无法计算,例如输入路径不存在,作业将不被提交,错误返回给mapreduce程序。
4)将运行作业所需资源(作业jar文件,配置文件和计算得到的分片)复制到HDFS上。
5)告知JobTracker作业准备执行(使用JobTracker对象的submitJob()方法来真正提交作业)。
当JobTracker收到Job提交的请求后,将Job保存在一个内部队列,并让Job Scheduler(作业调度器)处理并初始化。初始化涉及到创建一个封装了其tasks的job对象,
并保持对task的状态和进度的跟踪(step 5)。当创建要运行的一系列task对象后,Job Scheduler首先开始从文件系统中获取由JobClient计算的input splits(step 6),然后
再为每个split创建map task。
TaskTracker和JobTracker之间的通信和任务分配是通过心跳机制完成的。TaskTracker作为一个单独的JVM,它执行一个简单的循环,主要实现每隔一段时间向JobTracker
发送心跳,告诉JobTracker此TaskTracker是否存活,是否准备执行新的任务。如果有待分配的任务,它就会为TaskTracker分配一个任务。
TaskTracker申请到新的任务之后,就要在本地运行了。首先,是将任务本地化(包括运行任务所需的数据、配置信息、代码等),即从HDFS复制到本地。调用localizeJob()完成的。
对于使用Streaming和Pipes创建Map或者Reduce程序的任务,Java会把key/value传递给外部进程,然后通过用户自定义的Map或者Reduce进行处理,然后把key/value传回到java中。
其中就好像是TaskTracker的子进程在处理Map和Reduce代码一样。
进度和状态是通过heartbeat(心跳机制)来更新和维护的。对于Map Task,进度就是已处理数据和所有输入数据的比例。对于Reduce Task,情况就邮电复杂,包括3部分,
拷贝中间结果文件、排序、reduce调用,每部分占1/3。
当Job完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为successful,同时JobClient也会轮循获知提交的Job已经完成,将信息显示给用户。
最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操作(比如删除中间结果文件)
一个完整的mapreduce程序在分布式运行时有三类实例进程:
MRAppMaster:负责整个程序的过程调度及状态协调(Hadoop2.0之后就不一样了)
mapTask:负责map阶段的整个数据处理流程
ReduceTask:负责reduce阶段的整个数据处理流程
流程分析:
1) 一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动
相应数量的maptask进程。
2)maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对
将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存
将缓存中的KV对按照K分区排序后不断溢写到磁盘文件
3) MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)
4)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,
然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。
maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度
那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢?
一个job的map阶段并行度由客户端在提交job时决定而客户端对map阶段并行度的规划的基本逻辑为: 将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理
这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成,其过程如下图:
1)FileInputFormat切片机制切片定义在InputFormat类中的getSplit()方法
2)FileInputFormat中默认的切片机制:
简单地按照文件的内容长度进行切片
切片大小,默认等于block大小
切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:
file1.txt 320M
file2.txt 10M
经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
3)FileInputFormat中切片的大小的参数配置
通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由这几个值来运算决定
minsize:默认值:1
配置参数: mapreduce.input.fileinputformat.split.minsize
maxsize:默认值:Long.MAXValue
配置参数:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默认情况下,切片大小=blocksize
maxsize(切片最大值):
参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值
minsize (切片最小值):
参数调的比blockSize大,则可以让切片变得比blocksize还大
选择并发数的影响因素:
运算节点的硬件配置
运算任务的类型:CPU密集型还是IO密集型
运算任务的数据量
reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
//默认值是1,手动设置为4
job.setNumReduceTasks(4);
如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜
注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask
尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。
1)概述
mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle。
shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存)。
具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序。
分区partition(确定哪个数据进入哪个reduce)
Sort根据key排序
Combiner进行局部value的合并
2)详细流程
1、 maptask收集我们的map()方法输出的kv对,放到内存缓冲区中
2、 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3、 多个溢出文件会被合并成大的溢出文件
4、 在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序
5、 reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
6、 reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
7、 合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整, 参数:io.sort.mb 默认100M
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序。
1) yarn并不清楚用户提交的程序的运行机制
2) yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源)
3) yarn中的主管角色叫ResourceManager
4) yarn中具体提供运算资源的角色叫NodeManager
5) 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序,tez ……
6) 所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可
7) Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享
6.3、YARN中运行运算程序实例
mapreduce程序的调度过程,如下图:
喜欢就点个“推荐”!
标签:源码 record int .com 理解 不同的 规划 images 申请
原文地址:http://www.cnblogs.com/zhangyinhua/p/7729878.html