标签:
MapReduce运行时,首先通过Map读取HDFS中的数据,然后经过拆分,将每个文件中的每行数据分拆成键值对,最后输出作为Reduce的输入,大体执行流程如下图所示:
整个流程图具体来说:每个Mapper任务是一个java进程,它会读取HDFS中的文件,解析成很多的键值对,经过我们覆盖的map方法处理后,转换为很多的键值对再输出,整个Mapper任务的处理过程又可以分为以下几个阶段,如图所示。
在上图中,把Mapper任务的运行过程分为六个阶段。
每个Reducer任务是一个java进程。Reducer任务接收Mapper任务的输出,归约处理后写入到HDFS中,可以分为如下图所示的几个阶段。
1、第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对,Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。
2、第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据,再对合并后的数据排序。
3、第三阶段是对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
在对Mapper任务、Reducer任务的分析过程中,会看到很多阶段都出现了键值对,这里对键值对进行编号,方便理解键值对的变化情况,如下图所示。
在上图中,对于Mapper任务输入的键值对,定义为key1和value1,在map方法中处理后,输出的键值对,定义为key2和value2,reduce方法接收key2和value2处理后,输出key3和value3。在下文讨论键值对时,可能把key1和value1简写 为<k1,v1>,key2和value2简写为<k2,v2>,key3和value3简写为<k3,v3>。
2014010114 2014010216 2014010317 2014010410 2014010506 2012010609 2012010732 2012010812 2012010919 2012011023 2001010116 2001010212 2001010310 2001010411 2001010529 2013010619 2013010722 2013010812 2013010929 2013011023 2008010105 2008010216 2008010337 2008010414 2008010516 2007010619 2007010712 2007010812 2007010999 2007011023 2010010114 2010010216 2010010317 2010010410 2010010506 2015010649 2015010722 2015010812 2015010999 2015011023
比如:2010012325表示在2010年01月23日的气温为25度。现在要求使用MapReduce,计算每一年出现过的最大气温。
1 package com.abc.yarn; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 16 public class Temperature { 17 /** 18 * 四个泛型类型分别代表: 19 * KeyIn Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,...) 20 * ValueIn Mapper的输入数据的Value,这里是每行文字 21 * KeyOut Mapper的输出数据的Key,这里是每行文字中的“年份” 22 * ValueOut Mapper的输出数据的Value,这里是每行文字中的“气温” 23 */ 24 static class TempMapper extends 25 Mapper<LongWritable, Text, Text, IntWritable> { 26 @Override 27 public void map(LongWritable key, Text value, Context context) 28 throws IOException, InterruptedException { 29 // 打印样本: Before Mapper: 0, 2000010115 30 System.out.print("Before Mapper: " + key + ", " + value); 31 String line = value.toString(); 32 String year = line.substring(0, 4); 33 int temperature = Integer.parseInt(line.substring(8)); 34 context.write(new Text(year), new IntWritable(temperature)); 35 // 打印样本: After Mapper:2000, 15 36 System.out.println( 37 "======" + 38 "After Mapper:" + new Text(year) + ", " + new IntWritable(temperature)); 39 } 40 } 41 42 /** 43 * 四个泛型类型分别代表: 44 * KeyIn Reducer的输入数据的Key,这里是每行文字中的“年份” 45 * ValueIn Reducer的输入数据的Value,这里是每行文字中的“气温” 46 * KeyOut Reducer的输出数据的Key,这里是不重复的“年份” 47 * ValueOut Reducer的输出数据的Value,这里是这一年中的“最高气温” 48 */ 49 static class TempReducer extends 50 Reducer<Text, IntWritable, Text, IntWritable> { 51 @Override 52 public void reduce(Text key, Iterable<IntWritable> values, 53 Context context) throws IOException, InterruptedException { 54 int maxValue = Integer.MIN_VALUE; 55 StringBuffer sb = new StringBuffer(); 56 //取values的最大值 57 for (IntWritable value : values) { 58 maxValue = Math.max(maxValue, value.get()); 59 sb.append(value).append(", "); 60 } 61 // 打印样本: Before Reduce: 2000, 15, 23, 99, 12, 22, 62 System.out.print("Before Reduce: " + key + ", " + sb.toString()); 63 context.write(key, new IntWritable(maxValue)); 64 // 打印样本: After Reduce: 2000, 99 65 System.out.println( 66 "======" + 67 "After Reduce: " + key + ", " + maxValue); 68 } 69 } 70 71 public static void main(String[] args) throws Exception { 72 //输入路径 73 String dst = "hdfs://localhost:9000/intput.txt"; 74 //输出路径,必须是不存在的,空文件加也不行。 75 String dstOut = "hdfs://localhost:9000/output"; 76 Configuration hadoopConfig = new Configuration(); 77 78 hadoopConfig.set("fs.hdfs.impl", 79 org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() 80 ); 81 hadoopConfig.set("fs.file.impl", 82 org.apache.hadoop.fs.LocalFileSystem.class.getName() 83 ); 84 Job job = new Job(hadoopConfig); 85 86 //如果需要打成jar运行,需要下面这句 87 //job.setJarByClass(NewMaxTemperature.class); 88 89 //job执行作业时输入和输出文件的路径 90 FileInputFormat.addInputPath(job, new Path(dst)); 91 FileOutputFormat.setOutputPath(job, new Path(dstOut)); 92 93 //指定自定义的Mapper和Reducer作为两个阶段的任务处理类 94 job.setMapperClass(TempMapper.class); 95 job.setReducerClass(TempReducer.class); 96 97 //设置最后输出结果的Key和Value的类型 98 job.setOutputKeyClass(Text.class); 99 job.setOutputValueClass(IntWritable.class); 100 101 //执行job,直到完成 102 job.waitForCompletion(true); 103 System.out.println("Finished"); 104 } 105 }
上面代码中,注意Mapper类的泛型不是java的基本类型,而是Hadoop的数据类型Text、IntWritable。我们可以简单的等价为java的类String、int。
代码中Mapper类的泛型依次是<k1,v1,k2,v2>。map方法的第二个形参是行文本内容,是我们关心的。核心代码是把行文本内容按照空格拆分,把每行数据中“年”和“气温”提取出来,其中“年”作为新的键,“温度”作为新的值,写入到上下文context中。在这里,因为每一年有多行数据,因此每一行都会输出一个<年份, 气温>键值对。
Before Mapper: 0, 2014010114======After Mapper:2014, 14 Before Mapper: 11, 2014010216======After Mapper:2014, 16 Before Mapper: 22, 2014010317======After Mapper:2014, 17 Before Mapper: 33, 2014010410======After Mapper:2014, 10 Before Mapper: 44, 2014010506======After Mapper:2014, 6 Before Mapper: 55, 2012010609======After Mapper:2012, 9 Before Mapper: 66, 2012010732======After Mapper:2012, 32 Before Mapper: 77, 2012010812======After Mapper:2012, 12 Before Mapper: 88, 2012010919======After Mapper:2012, 19 Before Mapper: 99, 2012011023======After Mapper:2012, 23 Before Mapper: 110, 2001010116======After Mapper:2001, 16 Before Mapper: 121, 2001010212======After Mapper:2001, 12 Before Mapper: 132, 2001010310======After Mapper:2001, 10 Before Mapper: 143, 2001010411======After Mapper:2001, 11 Before Mapper: 154, 2001010529======After Mapper:2001, 29 Before Mapper: 165, 2013010619======After Mapper:2013, 19 Before Mapper: 176, 2013010722======After Mapper:2013, 22 Before Mapper: 187, 2013010812======After Mapper:2013, 12 Before Mapper: 198, 2013010929======After Mapper:2013, 29 Before Mapper: 209, 2013011023======After Mapper:2013, 23 Before Mapper: 220, 2008010105======After Mapper:2008, 5 Before Mapper: 231, 2008010216======After Mapper:2008, 16 Before Mapper: 242, 2008010337======After Mapper:2008, 37 Before Mapper: 253, 2008010414======After Mapper:2008, 14 Before Mapper: 264, 2008010516======After Mapper:2008, 16 Before Mapper: 275, 2007010619======After Mapper:2007, 19 Before Mapper: 286, 2007010712======After Mapper:2007, 12 Before Mapper: 297, 2007010812======After Mapper:2007, 12 Before Mapper: 308, 2007010999======After Mapper:2007, 99 Before Mapper: 319, 2007011023======After Mapper:2007, 23 Before Mapper: 330, 2010010114======After Mapper:2010, 14 Before Mapper: 341, 2010010216======After Mapper:2010, 16 Before Mapper: 352, 2010010317======After Mapper:2010, 17 Before Mapper: 363, 2010010410======After Mapper:2010, 10 Before Mapper: 374, 2010010506======After Mapper:2010, 6 Before Mapper: 385, 2015010649======After Mapper:2015, 49 Before Mapper: 396, 2015010722======After Mapper:2015, 22 Before Mapper: 407, 2015010812======After Mapper:2015, 12 Before Mapper: 418, 2015010999======After Mapper:2015, 99 Before Mapper: 429, 2015011023======After Mapper:2015, 23 Before Reduce: 2001, 12, 10, 11, 29, 16, ======After Reduce: 2001, 29 Before Reduce: 2007, 23, 19, 12, 12, 99, ======After Reduce: 2007, 99 Before Reduce: 2008, 16, 14, 37, 16, 5, ======After Reduce: 2008, 37 Before Reduce: 2010, 10, 6, 14, 16, 17, ======After Reduce: 2010, 17 Before Reduce: 2012, 19, 12, 32, 9, 23, ======After Reduce: 2012, 32 Before Reduce: 2013, 23, 29, 12, 22, 19, ======After Reduce: 2013, 29 Before Reduce: 2014, 14, 6, 10, 17, 16, ======After Reduce: 2014, 17 Before Reduce: 2015, 23, 49, 22, 12, 99, ======After Reduce: 2015, 99
程序执行结果:
对分析的验证:从打印的日志中可以看出:
其中,由于输入数据太小,Map过程的第1阶段这里不能证明。但事实上是这样的。结论中第一点验证了Map过程的第2阶段:“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容。另外,通过Reduce的几行
Before Reduce: 2001, 12, 10, 11, 29, 16, ======After Reduce: 2001, 29 Before Reduce: 2007, 23, 19, 12, 12, 99, ======After Reduce: 2007, 99 Before Reduce: 2008, 16, 14, 37, 16, 5, ======After Reduce: 2008, 37 Before Reduce: 2010, 10, 6, 14, 16, 17, ======After Reduce: 2010, 17 Before Reduce: 2012, 19, 12, 32, 9, 23, ======After Reduce: 2012, 32 Before Reduce: 2013, 23, 29, 12, 22, 19, ======After Reduce: 2013, 29 Before Reduce: 2014, 14, 6, 10, 17, 16, ======After Reduce: 2014, 17 Before Reduce: 2015, 23, 49, 22, 12, 99, ======After Reduce: 2015, 99
可以证实Map过程的第4阶段:先分区,然后对每个分区都执行一次Reduce(Map过程第6阶段),对于Mapper的输出,前文中提到:如果没有Reduce过程,Mapper的输出会直接写入文件,于是我们把Reduce方法去掉(注释掉第95行即可),再执行,下面是控制台打印结果:
Before Mapper: 0, 2014010114======After Mapper:2014, 14 Before Mapper: 11, 2014010216======After Mapper:2014, 16 Before Mapper: 22, 2014010317======After Mapper:2014, 17 Before Mapper: 33, 2014010410======After Mapper:2014, 10 Before Mapper: 44, 2014010506======After Mapper:2014, 6 Before Mapper: 55, 2012010609======After Mapper:2012, 9 Before Mapper: 66, 2012010732======After Mapper:2012, 32 Before Mapper: 77, 2012010812======After Mapper:2012, 12 Before Mapper: 88, 2012010919======After Mapper:2012, 19 Before Mapper: 99, 2012011023======After Mapper:2012, 23 Before Mapper: 110, 2001010116======After Mapper:2001, 16 Before Mapper: 121, 2001010212======After Mapper:2001, 12 Before Mapper: 132, 2001010310======After Mapper:2001, 10 Before Mapper: 143, 2001010411======After Mapper:2001, 11 Before Mapper: 154, 2001010529======After Mapper:2001, 29 Before Mapper: 165, 2013010619======After Mapper:2013, 19 Before Mapper: 176, 2013010722======After Mapper:2013, 22 Before Mapper: 187, 2013010812======After Mapper:2013, 12 Before Mapper: 198, 2013010929======After Mapper:2013, 29 Before Mapper: 209, 2013011023======After Mapper:2013, 23 Before Mapper: 220, 2008010105======After Mapper:2008, 5 Before Mapper: 231, 2008010216======After Mapper:2008, 16 Before Mapper: 242, 2008010337======After Mapper:2008, 37 Before Mapper: 253, 2008010414======After Mapper:2008, 14 Before Mapper: 264, 2008010516======After Mapper:2008, 16 Before Mapper: 275, 2007010619======After Mapper:2007, 19 Before Mapper: 286, 2007010712======After Mapper:2007, 12 Before Mapper: 297, 2007010812======After Mapper:2007, 12 Before Mapper: 308, 2007010999======After Mapper:2007, 99 Before Mapper: 319, 2007011023======After Mapper:2007, 23 Before Mapper: 330, 2010010114======After Mapper:2010, 14 Before Mapper: 341, 2010010216======After Mapper:2010, 16 Before Mapper: 352, 2010010317======After Mapper:2010, 17 Before Mapper: 363, 2010010410======After Mapper:2010, 10 Before Mapper: 374, 2010010506======After Mapper:2010, 6 Before Mapper: 385, 2015010649======After Mapper:2015, 49 Before Mapper: 396, 2015010722======After Mapper:2015, 22 Before Mapper: 407, 2015010812======After Mapper:2015, 12 Before Mapper: 418, 2015010999======After Mapper:2015, 99 Before Mapper: 429, 2015011023======After Mapper:2015, 23 Finished
控制台执行结果:
由于没有执行Reduce操作,因此这个就是Mapper输出的中间文件的内容了。从打印的日志可以看出:Mapper的输出数据(k2, v2)格式是:经自己在Mapper中写出的格式:<2010, 25>,<2012, 23>...从这个结果中可以看出,原数据文件中的每一行确实都有一行输出,那么Map过程的第3阶段就证实了。从这个结果中还可以看出,“年份”已经不是输入给Mapper的顺序了,这也说明了在Map过程中也按照Key执行了排序操作,即Map过程的第5阶段。
标签:
原文地址:http://www.cnblogs.com/sunfie/p/4651609.html