标签:ret xtend org cer wordcount reduce util add cin
1、数据准备:
Mike,35
Steven,40
Ken,28
Cindy,32
2、预期结果
Max 40
Min 28
Avg 33
3、MapReduce代码如下
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class AgeMapReduce { public static class WordCountMapper extends Mapper<Object, Text, Text, Text> { private Text nameKey = new Text(); private Text ageValue = new Text(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { String content = itr.nextToken(); String[] nameAndAge = content.split(","); //String name = nameAndAge[0]; String age = nameAndAge[1]; nameKey.set("only you"); ageValue.set(age); context.write(nameKey, ageValue); } } } public static class WordCountReduce extends Reducer<Text, Text, Text, Text> { private int min = Integer.MAX_VALUE; private int max = 0; private int sum = 0; private int count = 0; @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text tmpAge : values) { int age = Integer.valueOf(tmpAge.toString()); if (age < min) { min = age; } if (age > max) { max = age; } sum += age; count++; } //String resultStr = min + "\t" + max + "\t" + (sum / count); //result.set(resultStr); context.write(new Text("Max"), new Text(String.valueOf(min))); context.write(new Text("Min"), new Text(String.valueOf(max))); context.write(new Text("Avg"), new Text(String.valueOf(sum/count))); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: MinMaxCountDriver <in> <out>"); System.exit(2); } Job job = new Job(conf, "StackOverflow Comment Date Min Max Count"); job.setJarByClass(AgeMapReduce.class); job.setMapperClass(WordCountMapper.class); // job.setCombinerClass(MusicReduce.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // user/joe/wordcount/input FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // user/joe/wordcount/output FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
4、注意事项
因为输出的结果和Key没有关系,所以在map阶段要固定一个Key即可。
Hadoop实战-MapReduce之max、min、avg统计(六)
标签:ret xtend org cer wordcount reduce util add cin
原文地址:http://www.cnblogs.com/qq27271609/p/6822849.html