标签:
MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出。 Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中。
每个Mapper任务是一个java进程,它会读取HDFS中的文件,解析成很多的键值对,经过我们覆盖的map方法处理后,转换为很多的键值对再输出。整个Mapper任务的处理过程又可以分为以下几个阶段。
把Mapper任务的运行过程分为六个阶段。
每个Reducer任务是一个java进程。Reducer任务接收Mapper任务的输出,归约处理后写入到HDFS中,可以分为以下几个阶段。
在整个MapReduce程序的开发过程中,我们最大的工作量是覆盖map函数和覆盖reduce函数。
在对Mapper任务、Reducer任务的分析过程中,会看到很多阶段都出现了键值对,读者容易混淆,所以这里对键值对进行编号,方便大家理解键值对的变化情况
对于Mapper任务输入的键值对,定义为key1和value1。在map方法中处理后,输出的键值对,定义为key2和 value2。reduce方法接收key2和value2,处理后,输出key3和value3。在下文讨论键值对时,可能把key1和value1简 写为<k1,v1>,key2和value2简写为<k2,v2>,key3和value3简写为<k3,v3>。
该业务要求统计指定文件中的所有单词的出现次数。
内容很简单,两行文本,每行的单词中间使用空格区分。
分析思路:最直观的想法是使用数据结构Map。解析文件中出现的每个单词,用单词作为key,出现次数作为value。这个思路没有问题,但是在大 数据环境下就不行了。我们需要使用MapReduce来做。根据Mapper任务和Reducer任务的运行阶段,我们知道在Mapper任务的第二阶段 是把文件的每一行转化成键值对,那么第三阶段的map方法就能取得每一行文本内容,我们可以在map方法统计本行文本中单词出现的次数,把每个单词的出现 次数作为新的键值对输出。在Reducer任务的第二阶段会对Mapper任务输出的键值对按照键进行排序,键相等的键值对会调用一次reduce方法。 在这里,“键”就是单词,“值”就是出现次数。因此可以在reduce方法中对单词的不同行中的所有出现次数相加,结果就是该单词的总的出现次数。最后把 这个结果输出。
看一下如何覆盖map方法
static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //key2 表示该行中的单词 final Text key2 = new Text(); //value2 表示单词在该行中的出现次数 final IntWritable value2 = new IntWritable(1); //key 表示文本行的起始位置,也即是偏移量 //value 表示文本行 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { final String[] splited = value.toString().split(" "); for (String word : splited) { key2.set(word); //把key2、value2写入到context中 context.write(key2, value2); } }; }
上面代码中,注意Mapper类的泛型不是java的基本类型,而是Hadoop的数据类型LongWritable、Text、IntWritable。读者可以简单的等价为java的类long、String、int。下文会有专门讲解Hadoop的数据类型。
代码中Mapper类的泛型依次是<k1,v1,k2,v2>。map方法的第二个形参是行文本内容,是我们关心的。核心代码是把行文 本内容按照空格拆分,把每个单词作为新的键,数值1作为新的值,写入到上下文context中。在这里,因为输出的是每个单词,所以出现次数是常量1。如果一行文本中包括两个hello,会输出两次<hello,1>。
再来看一下如何覆盖reduce方法
static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ //value3表示单词出现的总次数 final IntWritable value3 = new IntWritable(0); /** * key 表示单词 * values 表示map方法输出的1的集合 * context 上下文对象 */ protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception { int sum = 0; for (IntWritable count : values) { sum += count.get(); } //执行到这里,sum表示该单词出现的总次数 //key3表示单词,是最后输出的key final Text key3 = key; //value3表示单词出现的总次数,是最后输出的value value3.set(sum); context.write(key3, value3); }; }
上面代码中,Reducer类的四个泛型依次是<k2,v2,k3,v3>,要注意reduce方法的第二个参数是java.lang.Iterable类型,迭代的是v2。也就是k2相同的v2都可以迭代出来。
以上就是我们覆盖的map方法和reduce方法。现在要把我们的代码运行起来,需要写驱动代码,如下
/** * 驱动代码 */ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //输入路径 final String INPUT_PATH = "hdfs://hadoop0:9000/input"; //输出路径,必须是不存在的 final String OUTPUT_PATH = "hdfs://hadoop0:9000/output"; //创建一个job对象,封装运行时需要的所有信息 final Job job = new Job(new Configuration(),"WordCountApp"); //如果需要打成jar运行,需要下面这句 job.setJarByClass(WordCountApp.class); //告诉job执行作业时输入文件的路径 FileInputFormat.setInputPaths(job, INPUT_PATH); //设置把输入文件处理成键值对的类 job.setInputFormatClass(TextInputFormat.class); //设置自定义的Mapper类 job.setMapperClass(MyMapper.class); //设置map方法输出的k2、v2的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置对k2分区的类 job.setPartitionerClass(HashPartitioner.class); //设置运行的Reducer任务的数量 job.setNumReduceTasks(1); //设置自定义的Reducer类 job.setReducerClass(MyReducer.class); //设置reduce方法输出的k3、v3的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告诉job执行作业时的输出路径 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //指明输出的k3类型 job.setOutputKeyClass(Text.class); //指明输出的v3类型 job.setOutputValueClass(IntWritable.class); //让作业运行,直到运行结束,程序退出 job.waitForCompletion(true); }
在以上代码中,我们创建了一个job对象,这个对象封装了我们的任务,可以提交到Hadoop独立运行。最后一句job.waitForCompletion(true),表示把job对象提交给Hadoop运行,直到作业运行结束后才可以。
以上代码的运行方式有两种,一种是在宿主机的eclipse环境中运行,一种是打成jar包在linux中运行。
第一种运行方式要求宿主机能够访问linux,并且对于输入路径和输出路径中的主机名hadoop0,要在宿主机的hosts文件中有绑定,笔者的hosts文件位于C:\WINDOWS\system32\drivers\etc文件夹。
第二种运行方式,需要把代码打成jar包,在linux下执行命令hadoop jar xxx.jar运行
运行结束后,文件路径在hdfs://hadoop0:9000/output/part-r-00000。
标签:
原文地址:http://www.cnblogs.com/itboys/p/5696933.html