1.1读取输入文件内容,接卸成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
1.2写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
1.3对输出的key、value进行分区。
1.4对不同分区的数据,按照key进行排序、分区。相同key的value放到一个集合中。
1.5(可选)分组后的数据进行归纳。
2.1对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2.2对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
2.3把reduce的输出保存到文件中。
===================================================================================
/*
* 1、分析业务逻辑,确定输入输出数据的样式
* 2、自定义一个类,这个类要继承mapper,重写map方法,实现具体业务逻辑,然后将新的key,value输出
* 3、自顶一个各类,这个类要继承reducer,重写reduce方法,实现具体业务逻辑,然后将新的key,value输出
* 4、将自定义的mapper和reducer通过job对象组装起来
*/map方法
package cn.intcast.hadoop.mr; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WCMapper extends Mapper<LongWritable, Text , Text, LongWritable>{ //重写map方法 shift+alt+s //注意序列化机制,jdk和hadoop中的序列化是不一样的,这里应采用hadoop中的类LongWritable,对应long;Text对应String @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //接收数据V1 String line = value.toString(); //切分数据 String[] words = line.split(" "); //循环 for(String w:words) { //出现一次,记录一次输出 context.write(new Text(w), new LongWritable(1)); } } }
reduce函数package cn.intcast.hadoop.mr; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException { //接收数据 Text k3 = k2; //定义一个计数器 long counter = 0; //循环v2s for(LongWritable i : v2s){ counter += i.get(); } //输出 context.write(k3, new LongWritable(counter)); } }WorldCount主函数
package cn.intcast.hadoop.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WorldCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //构建一个job对象 Job job = Job.getInstance(new Configuration()); //注意:main方法所在的类 job.setJarByClass(WorldCount.class); //组装map和reduce方法 //设置mapper相关属性 job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path("/user/guest/words.txt")); //设置reducer相关属性 job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path("/wcout0804")); job.waitForCompletion(true); } }
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/yaoxiaochuang/article/details/47264303