使用到Combiner编程(可插拔式)
在map端对输出先做合并,最基本是实现本地key合并,具有本地reduce功能
如果不用combiner,所有结果都是reduce完成,效率会底下
Combiner的的输入输出类型应该完全一致(实现如累加,最大值等功能)
job.setCombinerClass();
倒排索引基本实现
package cn.MapReduce.px; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class InverIndex { /* * dao pai suoyin */ public static class InverIndexMap extends Mapper<LongWritable, Text, Text, Text> { private Text k2 = new Text(); private Text v2 = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String words[] = line.split(" "); // get fileObject FileSplit inputSplit = (FileSplit) context.getInputSplit(); String path = inputSplit.getPath().toString(); for (String word : words) { String WordAndPath = word + "->" + path; k2.set(WordAndPath); v2.set("1"); context.write(k2, v2); } } } public static class InverIndexCombiner extends Reducer<Text, Text, Text, Text> { private Text k4 = new Text(); private Text v4 = new Text(); protected void reduce(Text k3, Iterable<Text> v3s, Context context) throws IOException, InterruptedException { String word = k3.toString().split("->")[0]; String path = k3.toString().split("->")[1]; Integer count = 0; for (Text t : v3s) { count += Integer.parseInt(t.toString()); } String PathAndCount = path + "->" + count; k4.set(word); v4.set(PathAndCount); context.write(k4, v4); } } public static class InverIndexReduce extends Reducer<Text, Text, Text, Text> { private Text v6 = new Text(); protected void reduce(Text k5, Iterable<Text> v5s, Context context) throws IOException, InterruptedException { String result = ""; for (Text t : v5s) { result += t.toString() + "\t"; } v6.set(result); context.write(k5, v6); } } public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(InverIndex.class); job.setMapperClass(InverIndexMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); job.setCombinerClass(InverIndexCombiner.class); job.setReducerClass(InverIndexReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
原文地址:http://blog.csdn.net/u010366796/article/details/44702361