码迷,mamicode.com
首页 > 编程语言 > 详细

MapReduce中的join算法-reduce端join

时间:2016-05-12 12:36:18      阅读:125      评论:0      收藏:0      [点我收藏+]

标签:

   在海量数据的环境下,不可避免的会碰到join需求, 例如在数据分析时需要连接从不同的数据源中获取到数据。
假设有两个数据集:气象站数据库和天气记录数据库,并考虑如何合二为一。
一个典型的查询是:输出气象站的历史信息,同时各行记录也包含气象站的元数据信息。

气象站和天气记录的示例数据分别如下所示:
Station ID            Station Name
011990-99999    SIHCCAJAVRI
012650-99999    TRNSET-HANSMOEN

Station ID            Timestamp    Temperature
012650-99999    194903241200    111
012650-99999    194903241800    78
011990-99999    195005150700    0
011990-99999    195005151200    22
011990-99999    195005151800    -11


气象站和天气记录合并之后的示意图如下所示。
Station ID    Station Name    Timestamp    Temperature
011990-99999    SIHCCAJAVRI    195005150700    0
011990-99999    SIHCCAJAVRI    195005151200    22
011990-99999    SIHCCAJAVRI    195005151800    -11
012650-99999    TYNSET-HANSMOEN    194903241200    111
012650-99999    TYNSET-HANSMOEN    194903241800    78

Reducer端连接:
基本思路是 mapper 为各个记录标记源,并且使用连接键作为 map 输出键,使键相同的记录放在同一 reducer 中。

 我们通过下面两种技术实现 reduce 端连接。

        1、多输入

        数据集的输入源往往有多种格式,因此可以使用 MultipleInputs 类来方便地解析和标注各个数据源。

        2、二次排序

reducer 将两个数据源中选出键相同的记录并不介意这些记录是否已排好序。此外,为了更好地执行连接操作,先将某一个数据源传输到 reducer 会非常重要。 
以上面的天气数据连接为例,当天气记录发送到 reducer 的时候,与这些记录有相同键的气象站信息最好也已经放在 reducer ,使得 reducer 能够将气象站名称填到天气记录之中就马上输出。 
虽然也可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况,因为其中任何一组的记录数量可能非常庞大,远远超出 reducer 的可用内存容量。 
因此我们用到二次排序技术,对 map 阶段输出的每个键的值进行排序,实现这一效果

---------------------------------------------------------------------------------------


程序示例:
[主类]
publicclassJoinRecordWithStationName extends Configured implements Tool{
     /**
      * 气象站 mapper 标记为“0”,先到达reducer
      */
     publicstaticclass JoinStationMapper extends Mapper< LongWritable,Text,TextPair,Text>{ 
         protectedvoid map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
             String line = value.toString();
             String[] arr = line.split( "\\s+" );//解析气象站数据 012650-99999194903241200  111
             int length = arr.length ;
           if (length==2){//满足这种数据格式
                //key=气象站id  value=气象站名称
                 context.write( new TextPair(arr[0],"0" ),new Text(arr[1]));
             }
         }
     }
     /**
      * 天气记录 mapper 标记为“1”,后到达reducer
      */
     publicstaticclass JoinRecordMapper extends Mapper< LongWritable,Text,TextPair,Text>{
         protectedvoid map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
          String line = value.toString();
          String[] arr = line.split( "\\s+" ,2);//解析天气记录数据
           int length = arr.length ;
           if (length==2){
                //key=气象站id  value=天气记录数据
               context.write( new TextPair(arr[0],"1" ),new Text(arr[1]));
          } 
         }
     }
     
     /**
      *通过上面的分组,将相同的气象站id分到同一个reducer中进行输出
      *
      *由于 TextPair 经过了二次排序,所以 reducer 会先接收到气象站数据。
     *因此从中抽取气象站名称,并将其作为后续每条输出记录的一部分写到输出文件
      */
     publicstaticclass JoinReducer extends Reducer< TextPair,Text,Text,Text>{
         protectedvoid reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException{
             Iterator< Text> iter = values.iterator();
             Text stationName = new Text(iter.next());//气象站名称   SIHCCAJAVRI
            
             while (iter.hasNext()){
                 Text record = iter.next(); //天气记录的每条数据   195005151200   22
                 Text outValue = new Text(stationName.toString()+"\t" +record.toString());
                 context.write(key.getFirst(),outValue);
             }
         }       
     }

 //输出结果:
//   011990-99999  SIHCCAJAVRI   195005151800  -11
//   011990-99999  SIHCCAJAVRI   195005151200  22
//   011990-99999  SIHCCAJAVRI   195005150700  0
//   012650-99999  TRNSET-HANSMOEN    194903241800  78
//   012650-99999  TRNSET-HANSMOEN    194903241200  111
     
     /**
      * 自定义分区方法,将气象站id相同的记录分到相同的reducer中
      *
      */
     publicstaticclass KeyPartitioner extends Partitioner< TextPair,Text>{
           publicint getPartition(TextPair key,Text value,int numPartitions){
               //根据气象站id进行选择分区,而不是组合键的整体
               return (key.getFirst().hashCode()&Integer.MAX_VALUE ) % numPartitions;
          }
     }
     

     
     /**
      * 自定义分组,将气象站id相同的key放到同一个reducer中执行。然后再通过TextPair进行内部比较排序
      */
     publicstaticclass GroupingComparator extends WritableComparator{
           public GroupingComparator() {
               super (TextPair.class,true);
          }
          
           @SuppressWarnings ("rawtypes" )
           @Override
           publicint compare(WritableComparable w1, WritableComparable w2) {
              TextPair tp1=(TextPair)w1;
              TextPair tp2=(TextPair)w2;
              Text f1= tp1.getFirst();
              Text f2= tp2.getFirst();
               return f1.compareTo(f2);
          }
     }
     
     
     @Override
     publicint run(String[] args) throws Exception{
          Configuration conf = new Configuration();// 读取配置文件
          
          Job job = Job. getInstance();// 新建一个任务
          job.setJarByClass(JoinRecordWithStationName. class );// 主类
          
          Path recordInputPath = new Path(args[0]);//天气记录数据源
          Path stationInputPath = new Path(args[1]);//气象站数据源
          
          Path outputPath = new Path(args[2]);//输出路径
           //如果输出路径存在就删除
          FileSystem fs = outputPath.getFileSystem(conf);
           if (fs.isDirectory(outputPath)){
              fs.delete(outputPath, true );
          }
          
          MultipleInputs. addInputPath(job,recordInputPath,TextInputFormat. class ,JoinRecordMapper.class);//读取天气记录Mapper
          MultipleInputs. addInputPath(job,stationInputPath,TextInputFormat. class ,JoinStationMapper.class);//读取气象站Mapper
          
          FileOutputFormat. setOutputPath(job,outputPath);
          
          job.setPartitionerClass(KeyPartitioner. class );//自定义分区
          job.setGroupingComparatorClass(GroupingComparator. class );//自定义分组
          
          job.setMapOutputKeyClass(TextPairclass );
          
          job.setReducerClass(JoinReducer. class );// Reducer
          
          job.setOutputKeyClass(Text. class );
          
           return job.waitForCompletion(true)?0:1;
          
          }
          
           publicstaticvoid main(String[] args) throws Exception{
              String args0[]={
                         "hdfs://yun-11:9000/join/records.txt" ,
                         "hdfs://yun-11:9000/join/station.txt" ,
                         "hdfs://yun-11:9000/join/out"      };
               int exitCode = ToolRunner.runnew JoinRecordWithStationName(),args0);
              System. exit(exitCode);
     }

}

自定义组合键:TextPair 
我们使用 TextPair 类构建组合键,包括气象站 ID 和 “标记”。在这里,“标记” 是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气记录先到达
一种简单的做法就是:对于气象站记录, “标记” 的值设为 0;对于天气记录,“标记” 的值设为1
------------------------------------------------------------------------------------------------------------------------------
package com.tan.join1;
import java.io.*;
import org.apache.hadoop.io.*;
publicclassTextPair implements WritableComparable<TextPair> {

     private   Text first//Text 类型的实例变量 first --气象站id
     private   Text second//Text 类型的实例变量 second--标记符号
     
     public TextPair() {
          set( new Text(),new Text());
     }
     
     public TextPair(String first, String second) {
          set( new Text(first),new Text(second));
     }
     
     public TextPair(Text first, Text second) {
          set(first, second);
     }
     
     publicvoid set(Text first, Text second) {
           this .first = first;
           this .second = second;
     }
     
     public Text getFirst() {
           returnfirst ;
     }
     
     public Text getSecond() {
           returnsecond ;
     }
     
     //将对象转换为字节流并写入到输出流out中
     @Override
     publicvoid write(DataOutput out)throws IOException {
           first.write(out);
           second.write(out);
     }
     
     //从输入流in中读取字节流反序列化为对象
     @Override
     publicvoid readFields(DataInput in)throws IOException {
           first.readFields(in);
           second.readFields(in);
     }
     
     @Override
     publicint hashCode() {
           returnfirst .hashCode() *163+ second.hashCode();
     }
     
     
     @Override
     publicboolean equals(Object o) {
           if (o instanceof TextPair) {
              TextPair tp = (TextPair) o;
               returnfirst .equals(tp.first) && second .equals(tp.second );
          }
               returnfalse ;
     }
     
     @Override
     public String toString() {
           returnfirst +"\t" + second ;
     }
     
     //排序
     @Override
     publicint compareTo(TextPair tp) {
           //根据第一个字段进行对比,如果相等再根据第二个字段进行排序
           if (!first .equals(tp.first)) {
               returnfirst .compareTo(tp.first);
          } elseif (!second .equals(tp.second)){
               returnsecond .compareTo(tp.second);
          }
           return 0;
     }
}





 Map端连接
---------------------------------------------------------------------------------------

   在两个大规模输入数据集之间的 map 端连接会在数据达到 map 函数之前就执行连接操作。为达到该目的,各 map 的输入数据必须先分区并且以特定方式排序。 

各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。

 map 端连接操作可以连接多个作业的输出,只要这些作业的 reducer 数量相同、键相同并且输出文件是不可切分的(例如,小于一个 HDFS 块,或 gzip 压缩)。 


在上面讲的天气例子中,如果气象站文件以气象站ID部分排序,天气记录文件也以气象站 ID 部分排序,而且 reducer 的数量相同,则就满足了执行 map 端连接的前提条件。

利用 org.apache.hadoop.mapreduce.join 包中的 CompositeInputFormat 类来运行一个 map 端连接。CompositeInputFormat 类的输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置, 连接表达式的语法简单。


待补充..

MapReduce中的join算法-reduce端join

标签:

原文地址:http://blog.csdn.net/u010834071/article/details/51365642

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!