码迷,mamicode.com
首页 > 其他好文 > 详细

MapReduce求平均值

时间:2015-12-15 13:59:21      阅读:163      评论:0      收藏:0      [点我收藏+]

标签:

一:背景

求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,Reduce端汇总并且统计记录数,然后作商即可。

 

二:技术实现

#需求:现有成绩单如下,求出每个同学的平均成绩

 

[java] view plaincopy
 
  1. 小民  语文  80  
  2. 小民  数学  98  
  3. 小民  英语  89  
  4. 小芳  语文  88  
  5. 小芳  数学  99  
  6. 小芳  英语  90  


实现代码:

 

 

[java] view plaincopy
 
  1. public class AverageTest {  
  2.         // 定义输入路径  
  3.         private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/average_file";  
  4.         // 定义输出路径  
  5.         private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
  6.   
  7.         public static void main(String[] args) {  
  8.   
  9.             try {  
  10.                 // 创建配置信息  
  11.                 Configuration conf = new Configuration();  
  12.                   
  13.                 // 创建文件系统  
  14.                 FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  15.                 // 如果输出目录存在,我们就删除  
  16.                 if (fileSystem.exists(new Path(OUT_PATH))) {  
  17.                     fileSystem.delete(new Path(OUT_PATH), true);  
  18.                 }  
  19.   
  20.                 // 创建任务  
  21.                 Job job = new Job(conf, AverageTest.class.getName());  
  22.   
  23.                 //1.1   设置输入目录和设置输入数据格式化的类  
  24.                 FileInputFormat.setInputPaths(job, INPUT_PATH);  
  25.                 job.setInputFormatClass(TextInputFormat.class);  
  26.   
  27.                 //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  28.                 job.setMapperClass(AverageMapper.class);  
  29.                 job.setMapOutputKeyClass(Text.class);  
  30.                 job.setMapOutputValueClass(Text.class);  
  31.   
  32.                 //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
  33.                 job.setPartitionerClass(HashPartitioner.class);  
  34.                 job.setNumReduceTasks(1);  
  35.   
  36.                 //1.4   排序  
  37.                 //1.5   归约  
  38.                 //2.1   Shuffle把数据从Map端拷贝到Reduce端。  
  39.                 //2.2   指定Reducer类和输出key和value的类型  
  40.                 job.setReducerClass(AverageReducer.class);  
  41.                 job.setOutputKeyClass(Text.class);  
  42.                 job.setOutputValueClass(FloatWritable.class);  
  43.   
  44.                 //2.3   指定输出的路径和设置输出的格式化类  
  45.                 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  46.                 job.setOutputFormatClass(TextOutputFormat.class);  
  47.   
  48.   
  49.                 // 提交作业 退出  
  50.                 System.exit(job.waitForCompletion(true) ? 0 : 1);  
  51.               
  52.             } catch (Exception e) {  
  53.                 e.printStackTrace();  
  54.             }  
  55.         }  
  56.   
  57.     public static class AverageMapper extends Mapper<LongWritable, Text, Text, Text>{  
  58.         //设置输出的key和value  
  59.         private Text outKey = new Text();  
  60.         private Text outValue = new Text();  
  61.         @Override  
  62.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {  
  63.           
  64.             //获取输入的行  
  65.             String line = value.toString();  
  66.             //取出无效记录  
  67.             if (line == null || line.equals("")){  
  68.                 return ;  
  69.             }  
  70.             //对数据进行切分  
  71.             String[] splits = line.split("\t");  
  72.               
  73.             //截取姓名和成绩  
  74.             String name = splits[0];  
  75.             String score = splits[2];  
  76.             //设置输出的Key和value  
  77.             outKey.set(name);  
  78.             outValue.set(score);  
  79.             //将结果写出去  
  80.             context.write(outKey, outValue);  
  81.               
  82.         }  
  83.           
  84.     }  
  85.       
  86.     public static class AverageReducer extends Reducer<Text, Text, Text, FloatWritable>{  
  87.         //定义写出去的Key和value  
  88.         private Text name = new Text();  
  89.         private FloatWritable avg = new FloatWritable();  
  90.         @Override  
  91.         protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, FloatWritable>.Context context) throws IOException, InterruptedException {  
  92.             //定义科目数量  
  93.             int courseCount = 0;  
  94.             //定义中成绩  
  95.             int sum = 0;  
  96.             //定义平均分  
  97.             float average = 0;  
  98.               
  99.             //遍历集合求总成绩  
  100.             for (Text val : value){  
  101.                 sum += Integer.parseInt(val.toString());  
  102.                 courseCount ++;  
  103.             }  
  104.               
  105.             //求平均成绩  
  106.             average = sum / courseCount;  
  107.               
  108.             //设置写出去的名字和成绩  
  109.             name.set(key);  
  110.             avg.set(average);  
  111.               
  112.             //把结果写出去  
  113.             context.write(name, avg);  
  114.         }  
  115.     }  
  116. }  

 

 

程序运行的结果:

技术分享

MapReduce求平均值

标签:

原文地址:http://www.cnblogs.com/thinkpad/p/5047730.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!