标签:key 关系 app ntc 分享 sort 执行 调整 images
如果我们须要处理一批有关天气的数据。其格式例如以下:
0067011990999991950051507+0000+ 0043011990999991950051512+0022+ 0043011990999991950051518-0011+ 0043012650999991949032412+0111+ 0043012650999991949032418+0078+ 0067011990999991937051507+0001+ 0043011990999991937051512-0002+ 0043011990999991945051518+0001+ 0043012650999991945032412+0002+ 0043012650999991945032418+0078+ |
如今须要统计出每年的最高温度。
Map-Reduce主要包含两个步骤:Map和Reduce
每一步都有key-value对作为输入和输出:
对于上面的样例,在map过程,输入的key-value对例如以下:
(0,0067011990999991950051507+0000+) (33,0043011990999991950051512+0022+) (66,0043011990999991950051518-0011+) (99,0043012650999991949032412+0111+) (132,0043012650999991949032418+0078+) (165,0067011990999991937051507+0001+) (198,0043011990999991937051512-0002+) (231,0043011990999991945051518+0001+) (264,0043012650999991945032412+0002+) (297,0043012650999991945032418+0078+) |
在map过程中。通过对每一行字符串的解析,得到年-温度的key-value对作为输出:
(1950, 0) (1950, 22) (1950, -11) (1949, 111) (1949, 78) (1937, 1) (1937, -2) (1945, 1) (1945, 2) (1945, 78) |
在reduce过程。将map过程中的输出。依照同样的key将value放到同一个列表中作为reduce的输入
(1950, [0, 22, –11]) (1949, [111, 78]) (1937, [1, -2]) (1945, [1, 2, 78]) |
在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:
(1950, 22) (1949, 111) (1937, 1) (1945, 78) |
其逻辑过程可用例如以下图表示:
下图大概描写叙述了Map-Reduce的Job执行的基本原理:
以下我们讨论JobConf。其有非常多的项能够进行配置:
当然不用全部的都设置。由上面的样例。能够编写Map-Reduce程序例如以下:
public class MaxTemperature { } |
Map-Reduce的处理过程主要涉及下面四个部分:
JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。
提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务执行完成。
当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。
初始化首先创建一个对象来封装job执行的tasks, status以及progress。
在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。
其为每一个input split创建一个map task。
每一个task被分配一个ID。
TaskTracker周期性的向JobTracker发送heartbeat。
在heartbeat中。TaskTracker告知JobTracker其已经准备执行一个新的task。JobTracker将分配给其一个task。
在JobTracker为TaskTracker选择一个task之前。JobTracker必须首先依照优先级选择一个Job,在最高优先级的Job中选择一个task。
TaskTracker有固定数量的位置来执行map task或者reduce task。
默认的调度器对待map task优先于reduce task
当选择reduce task的时候。JobTracker并不在多个task之间进行选择,而是直接取下一个,由于reducetask没有数据本地化的概念。
TaskTracker被分配了一个task,以下便要执行此task。
首先。TaskTracker将此job的jar从共享文件系统中复制到TaskTracker的文件系统中。
TaskTracker从distributed cache中将job执行所须要的文件复制到本地磁盘。
其次,其为每一个task创建一个本地的工作文件夹。将jar解压缩到文件文件夹中。
其三,其创建一个TaskRunner来执行task。
TaskRunner创建一个新的JVM来执行task。
被创建的child JVM和TaskTracker通信来报告执行进度。
MapRunnable从inputsplit中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。
map的输出并非直接写入硬盘,而是将其写入缓存memory buffer。
当buffer中数据的到达一定的大小。一个背景线程将数据開始写入硬盘。
在写入硬盘之前,内存中的数据通过partitioner分成多个partition。
在同一个partition中,背景线程会将数据依照key在内存中排序。
每次从内存向硬盘flush数据。都生成一个新的spill文件。
当此task结束之前。全部的spill文件被合并为一个整的被partition的并且排好序的文件。
reducer能够通过http协议请求map的输出文件,tracker.http.threads能够设置http服务线程数。
当map task结束后。其通知TaskTracker。TaskTracker通知JobTracker。
对于一个job,JobTracker知道TaskTracer和map输出的相应关系。
reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了全部的map输出。
reduce task须要其相应的partition的全部的map输出。
reduce task中的copy过程即当每一个map task结束的时候就開始拷贝输出。由于不同的maptask完毕时间不同。
reduce task中有多个copy线程,能够并行拷贝map输出。
当非常多map输出复制到reduce task后。一个背景线程将其合并为一个大的排好序的文件。
当全部的map输出都复制到reduce task后,进入sort过程,将全部的map输出合并为大的排好序的文件。
最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每一个key。最后的结果写入HDFS。
当JobTracker获得最后一个task的执行成功的报告后,将job得状态改为成功。
当JobClient从JobTracker轮询的时候。发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。
如有不懂,欢迎拨打10010或10086。转何哲江。
标签:key 关系 app ntc 分享 sort 执行 调整 images
原文地址:http://www.cnblogs.com/cxchanpin/p/6876533.html