标签:
开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密。这个可能是我做技术研究的思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不可分,所以当我写分布式文件系统时候,总是感觉自己的理解肤浅,今天我开始写mapreduce了,今天写文章时候比上周要进步多,不过到底能不能写好本文了,只有试试再说了。 Mapreduce初析 Mapreduce是一个计算框架,既然是做计算的框架,那么表现形式就是有个输入(input),mapreduce操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output),这个输出就是我们所需要的结果。 我们要学习的就是这个计算模型的运行规则。在运行一个mapreduce计算任务时候,任务过程被分为两个阶段:map阶段和reduce阶段,每个阶段都是用键值对(key/value)作为输入(input)和输出(output)。而程序员要做的就是定义好这两个阶段的函数:map函数和reduce函数。 Mapreduce的基础实例 讲解mapreduce运行原理前,首先我们看看mapreduce里的hello world实例WordCount,这个实例在任何一个版本的hadoop安装程序里都会有,大家很容易找到,这里我还是贴出代码,便于我后面的讲解,代码如下: package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 如何运行它,这里不做累述了,大伙可以百度下,网上这方面的资料很多。这里的实例代码是使用新的api,大家可能在很多书籍里看到讲解mapreduce的WordCount实例都是老版本的api,这里我不给出老版本的api,因为老版本的api不太建议使用了,大家做开发最好使用新版本的api,新版本api和旧版本api有区别在哪里: 新的api放在:org.apache.hadoop.mapreduce,旧版api放在:org.apache.hadoop.mapred 新版api使用虚类,而旧版的使用的是接口,虚类更加利于扩展,这个是一个经验,大家可以好好学习下hadoop的这个经验。 其他还有很多区别,都是说明新版本api的优势,因为我提倡使用新版api,这里就不讲这些,因为没必要再用旧版本,因此这种比较也没啥意义了。 下面我对代码做简单的讲解,大家看到要写一个mapreduce程序,我们的实现一个map函数和reduce函数。我们看看map的方法: public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…} 这里有三个参数,前面两个Object key, Text value就是输入的key和value,第三个参数Context context这是可以记录输入的key和value,例如:context.write(word, one);此外context还会记录map运算的状态。 对于reduce函数的方法: public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…} reduce函数的输入也是一个key/value的形式,不过它的value是一个迭代器的形式Iterable<IntWritable> values,也就是说reduce的输入是一个key对应一组的值的value,reduce也有context和map的context作用一致。 至于计算的逻辑就是程序员自己去实现了。 下面就是main函数的调用了,这个我要详细讲述下,首先是: Configuration conf = new Configuration(); 运行mapreduce程序前都要初始化Configuration,该类主要是读取mapreduce系统配置信息,这些信息包括hdfs还有mapreduce,也就是安装hadoop时候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解为啥要这么做,这个是没有深入思考mapreduce计算框架造成,我们程序员开发mapreduce时候只是在填空,在map函数和reduce函数里编写实际进行的业务逻辑,其它的工作都是交给mapreduce框架自己操作的,但是至少我们要告诉它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而这些信息就在conf包下的配置文件里。 接下来的代码是: String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } If的语句好理解,就是运行WordCount程序时候一定是两个参数,如果不是就会报错退出。至于第一句里的GenericOptionsParser类,它是用来解释常用hadoop命令,并根据需要为Configuration对象设置相应的值,其实平时开发里我们不太常用它,而是让类实现Tool接口,然后再main函数里使用ToolRunner运行程序,而ToolRunner内部会调用GenericOptionsParser。 接下来的代码是: Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); 第一行就是在构建一个job,在mapreduce框架里一个mapreduce任务也叫mapreduce作业也叫做一个mapreduce的job,而具体的map和reduce运算就是task了,这里我们构建一个job,构建时候有两个参数,一个是conf这个就不累述了,一个是这个job的名称。 第二行就是装载程序员编写好的计算程序,例如我们的程序类名就是WordCount了。这里我要做下纠正,虽然我们编写mapreduce程序只需要实现map函数和reduce函数,但是实际开发我们要实现三个类,第三个类是为了配置mapreduce如何运行map和reduce函数,准确的说就是构建一个mapreduce能执行的job了,例如WordCount类。 第三行和第五行就是装载map函数和reduce函数实现类了,这里多了个第四行,这个是装载Combiner类,这个我后面讲mapreduce运行机制时候会讲述,其实本例去掉第四行也没有关系,但是使用了第四行理论上运行效率会更好。 接下来的代码: job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); 这个是定义输出的key/value的类型,也就是最终存储在hdfs上结果文件的key/value的类型。 最后的代码是: FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); 第一行就是构建输入的数据文件,第二行是构建输出的数据文件,最后一行如果job运行成功了,我们的程序就会正常退出。FileInputFormat和FileOutputFormat是很有学问的,我会在下面的mapreduce运行机制里讲解到它们。 好了,mapreduce里的hello word程序讲解完毕,我这个讲解是从新办api进行,这套讲解在网络上还是比较少的,应该很具有代表性的。 Mapreduce运行机制 下面我要讲讲mapreduce的运行机制了,前不久我为公司出了一套hadoop面试题,里面就问道了mapreduce运行机制,出题时候我发现这个问题我自己似乎也将不太清楚,因此最近几天恶补了下,希望在本文里能说清楚这个问题。 下面我贴出几张图,这些图都是我在百度图片里找到的比较好的图片: 图片一:
标签:
原文地址:http://www.cnblogs.com/zhangwentong/p/5674301.html