标签:style blog io color os ar 使用 java for
众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。
在上述过程中,我们看到至少两个性能瓶颈:
Hadoop通过使用一个介于Mapper和Reducer之间的Combiner步骤来解决上述瓶颈。你可以将Combiner视为Reducer的一个帮手,它主要是为了削减Mapper的输出从而减少网
络带宽和Reducer之上的负载。如果我们定义一个Combiner,MapReducer框架会对中间数据多次地使用它进行处理。
如果Reducer只运行简单的分布式方法,例如最大值、最小值、或者计数,那么我们可以让Reducer自己作为Combiner。但许多有用的方法不是分布式的。以下我们使用求平均值作为例子进行讲解:
Mapper输出它所处理的键值对,为了使单个DataNode计算平均值Reducer会对它收到的<key,value>键值对进行排序,求和。
由于Reducer将它所收到的<key,value>键值的数目视为输入数据中的<key,value>键值对的数目,此时使用Combiner的主要障碍就是计数操作。我们可以重写MapReduce程序来明确的跟踪计数过程
package com; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AveragingWithCombiner extends Configured implements Tool { public static class MapClass extends Mapper<LongWritable,Text,Text,Text> { static enum ClaimsCounters { MISSING, QUOTED }; // Map Method public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String fields[] = value.toString().split(",", -20); String country = fields[4]; String numClaims = fields[8]; if (numClaims.length() > 0 && !numClaims.startsWith("\"")) { context.write(new Text(country), new Text(numClaims + ",1")); } } } public static class Reduce extends Reducer<Text,Text,Text,DoubleWritable> { // Reduce Method public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { double sum = 0; int count = 0; for (Text value : values) { String fields[] = value.toString().split(","); sum += Double.parseDouble(fields[0]); count += Integer.parseInt(fields[1]); } context.write(key, new DoubleWritable(sum/count)); } } public static class Combine extends Reducer<Text,Text,Text,Text> { // Reduce Method public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { double sum = 0; int count = 0; for (Text value : values) { String fields[] = value.toString().split(","); sum += Double.parseDouble(fields[0]); count += Integer.parseInt(fields[1]); } context.write(key, new Text(sum+","+count)); } } // run Method public int run(String[] args) throws Exception { // Create and Run the Job Job job = new Job(); job.setJarByClass(AveragingWithCombiner.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setJobName("AveragingWithCombiner"); job.setMapperClass(MapClass.class); job.setCombinerClass(Combine.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new AveragingWithCombiner(), args); System.exit(res); } }
Hadoop 使用Combiner提高Map/Reduce程序效率
标签:style blog io color os ar 使用 java for
原文地址:http://www.cnblogs.com/soaringEveryday/p/4063385.html