标签:
MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是"任务的分解与结果的汇总"。
在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker;另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker。
在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。
需要注意的是,用MapReduce来处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job又可以分为两种阶段:map阶段和reduce阶段。这两个阶段分别用两个函数表示,即map函数和reduce函数。map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,Hadoop函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。
MapReduce处理大数据集的过程
单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的"src/examples"目录下找到。单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数,如下图所示。
1)源代码程序
1 package org.apache.hadoop.examples; 2 3 import java.io.IOException; 4 5 import java.util.StringTokenizer; 6 7 import org.apache.hadoop.conf.Configuration; 8 9 import org.apache.hadoop.fs.Path; 10 11 import org.apache.hadoop.io.IntWritable; 12 13 import org.apache.hadoop.io.Text; 14 15 import org.apache.hadoop.mapreduce.Job; 16 17 import org.apache.hadoop.mapreduce.Mapper; 18 19 import org.apache.hadoop.mapreduce.Reducer; 20 21 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 22 23 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 24 25 import org.apache.hadoop.util.GenericOptionsParser; 26 27 public class WordCount { 28 29 public static class TokenizerMapper 30 31 extends Mapper<Object, Text, Text, IntWritable>{ 32 33 private final static IntWritable one = new IntWritable(1); 34 35 private Text word = new Text(); 36 37 38 39 public void map(Object key, Text value, Context context) 40 41 throws IOException, InterruptedException { 42 43 StringTokenizer itr = new StringTokenizer(value.toString()); 44 45 while (itr.hasMoreTokens()) { 46 47 word.set(itr.nextToken()); 48 49 context.write(word, one); 50 51 } 52 53 } 54 55 } 56 57 public static class IntSumReducer 58 59 extends Reducer<Text,IntWritable,Text,IntWritable> { 60 61 private IntWritable result = new IntWritable(); 62 63 public void reduce(Text key, Iterable<IntWritable> values,Context context) 64 65 throws IOException, InterruptedException { 66 67 int sum = 0; 68 69 for (IntWritable val : values) { 70 71 sum += val.get(); 72 73 } 74 75 result.set(sum); 76 77 context.write(key, result); 78 79 } 80 81 } 82 83 84 85 public static void main(String[] args) throws Exception { 86 87 Configuration conf = new Configuration(); 88 89 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 90 91 if (otherArgs.length != 2) { 92 93 System.err.println("Usage: wordcount <in> <out>"); 94 95 System.exit(2); 96 97 } 98 99 Job job = new Job(conf, "word count"); 100 101 job.setJarByClass(WordCount.class); 102 103 job.setMapperClass(TokenizerMapper.class); 104 105 job.setCombinerClass(IntSumReducer.class); 106 107 job.setReducerClass(IntSumReducer.class); 108 109 job.setOutputKeyClass(Text.class); 110 111 job.setOutputValueClass(IntWritable.class); 112 113 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 114 115 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 116 117 System.exit(job.waitForCompletion(true) ? 0 : 1); 118 119 } 120 121 }
2)Map过程
1 public static class TokenizerMapper 2 3 extends Mapper<Object, Text, Text, IntWritable>{ 4 5 private final static IntWritable one = new IntWritable(1); 6 7 private Text word = new Text(); 8 9 public void map(Object key, Text value, Context context) 10 11 throws IOException, InterruptedException { 12 13 StringTokenizer itr = new StringTokenizer(value.toString()); 14 15 while (itr.hasMoreTokens()) { 16 17 word.set(itr.nextToken()); 18 19 context.write(word, one); 20 21 } 22 23 }
Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。
3)Reduce过程
1 public static class IntSumReducer 2 3 extends Reducer<Text,IntWritable,Text,IntWritable> { 4 5 private IntWritable result = new IntWritable(); 6 7 public void reduce(Text key, Iterable<IntWritable> values,Context context) 8 9 throws IOException, InterruptedException { 10 11 int sum = 0; 12 13 for (IntWritable val : values) { 14 15 sum += val.get(); 16 17 } 18 19 result.set(sum); 20 21 context.write(key, result); 22 23 } 24 25 }
Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
4)执行MapReduce任务
1 public static void main(String[] args) throws Exception { 2 3 Configuration conf = new Configuration(); 4 5 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 6 7 if (otherArgs.length != 2) { 8 9 System.err.println("Usage: wordcount <in> <out>"); 10 11 System.exit(2); 12 13 } 14 15 Job job = new Job(conf, "word count"); 16 17 job.setJarByClass(WordCount.class); 18 19 job.setMapperClass(TokenizerMapper.class); 20 21 job.setCombinerClass(IntSumReducer.class); 22 23 job.setReducerClass(IntSumReducer.class); 24 25 job.setOutputKeyClass(Text.class); 26 27 job.setOutputValueClass(IntWritable.class); 28 29 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 30 31 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 32 33 System.exit(job.waitForCompletion(true) ? 0 : 1); 34 35 }
在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。
1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如图4-1所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。
图4-1 分割过程
2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如图4-2所示。
图4-2 执行map方法
3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图4-3所示。
图4-3 Map端排序及Combine过程
4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如图4-4所示
图4-4 Reduce端排序及输出结果
新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。
新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。
新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。
标签:
原文地址:http://www.cnblogs.com/shufy/p/5357688.html