码迷,mamicode.com
首页 > 其他好文 > 详细

Reduce侧联接

时间:2015-08-26 22:27:02      阅读:315      评论:0      收藏:0      [点我收藏+]

标签:datajoinma   reduce侧联接   

案例分析前提,了解其原理,以及术语

术语部分:
 1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式)
2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中。在这个目的下,我们将使用每个record自身的Data source名称标记每个record。
3.Group KeyGroup Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Customer ID(第一列的3)。由于datajoin包允许用户自定义group key,所以其较之关系数据库中的join key更一般、平常。

原理部分:
原理:

1、mapper端输入后,将数据封装成TaggedMapOutput类型,此类型封装数据源(tag)和值(value);

2、map阶段输出的结果不在是简单的一条数据,而是一条记录。记录=数据源(tag)+数据值(value).

3、combine接收的是一个组合:不同数据源却有相同组键的值;

4、不同数据源的每一条记录只能在一个combine中出现;
如图:
技术分享
技术分享


1.利用datajoin包来实现join:
---------------------
Hadoop的datajoin包中有三个需要我们继承的类:DataJoinMapperBase,DataJoinReducerBase,TaggedMapOutput。正如其名字一样,我们的MapClass将会扩展DataJoinMapperBase,Reduce类会扩展DataJoinReducerBase。这个datajoin包已经实现了map()和reduce()方法,因此我们的子类只需要实现一些新方法来设置一些细节。

  

  在用DataJoinMapperBase和DataJoinReducerBase之前,我们需要弄清楚我们贯穿整个程序使用的新的虚数据类TaggedMapOutput。

  

  根据之前我们在图Advance MapReduce的数据流中所展示的那样,mapper输出一个包(由一个key和一个value(tagged record)组成)。datajoin包将key设置为Text类型,将value设置为TaggedMapOutput类型(TaggedMapOutput是一个将我们的记录使用一个Text类型的tag包装起来的数据类型)。它实现了getTag()和setTag(Text tag)方法。它还定义了一个getData()方法,我们的子类将实现这个方法来处理record记录。我们并没有明确地要求子类实现setData()方法,但我们最好还是实现这个方法以实现程序的对称性(或者在构造函数中实现)。作为Mapper的输出,TaggedMapOutput需要是Writable类型,因此的子类还需要实现readFields()和write()方法。


DataJoinMapperBase:
-------------------

  回忆join数据流图,mapper的主要功能就是打包一个record使其能够和其他拥有相同group key的记录去向一个Reducer。DataJoinMapperBase完成所有的打包工作,这个类定义了三个虚类让我们的子类实现:

  protected abstract Text generateInputTag(String inputFile);

  protected abstract TaggedMapOutput generateTaggedMapOutut(Object value);

  protected abstract Text generateGroupKey(TaggedMapOutput aRecored);

  

  在一个map任务开始之前为所有这个map任务会处理的记录定义一个tag(Text),结果将保存到DataJoinMapperBase的inputTag变量中,我们也可以保存filename至inputFile变量中以待后用。


  在map任务初始化之后,DataJoinMapperBase的map()方法会对每一个记录执行。它调用了两个我们还没有实现的虚方法:generateTaggedMapOutput()以及generateGroupKey(aRecord);(详见代码)

DataJoinReducerBase:
--------------------

DataJoinMapperBase将我们所需要做的工作以一个full outer join的方式简化。我们的Reducer子类只需要实现combine()方法来滤除掉我们不需要的组合来得到我们需要的(inner join, left outer join等)。同时我们也在combiner()中将我们的组合格式化为输出格式。
/hadoop-2.6.0/share/hadoop/tools/lib   程序需要自己手动导入Jar包

MapperClass.java

package com.yc.zzg.test;

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperClass extends DataJoinMapperBase{

    @Override
    protected Text generateGroupKey(TaggedMapOutput arg0) {
        String line = ((Text)arg0.getData()).toString();
        String[] tokens = line.split(",");
        String groupKey = tokens[0];
        return new Text(groupKey);
    }

    @Override
    protected Text generateInputTag(String arg0) {

        return new Text(arg0);
    }

    @Override
    protected TaggedMapOutput generateTaggedMapOutput(Object arg0) {
        TaggedWritable tw = new TaggedWritable((Text)arg0);
        tw.setTag(this.inputTag);
        return tw;
    }

}

TaggedWritable.java

package com.yc.zzg.test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class TaggedWritable extends TaggedMapOutput {

    private Writable data;

    public TaggedWritable() {  
    }  

    public TaggedWritable(Writable data) {  
         this.tag = new Text("");  
         this.data = data;  
     } 

    @Override
    public Writable getData() {
        return data;
    }



    public void setData(Writable data) {
        this.data = data;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
          this.tag.readFields(in);    
          //加入此部分代码,否则,可能报空指针异常  
          String temp=in.readUTF();  
          if (this.data == null|| !this.data.getClass().getName().equals(temp)) {  
              try {  
                  this.data = (Writable) ReflectionUtils.newInstance(  
                          Class.forName(temp), null);  
              } catch (ClassNotFoundException e) {  
                  e.printStackTrace();  
              }  
          }  
          this.data.readFields(in);    
    }

    @Override
    public void write(DataOutput out) throws IOException {
        this.tag.write(out);    
        //此行代码很重要  
        out.writeUTF(this.data.getClass().getName());  

        this.data.write(out);   
    }

}

Reduce.java

package com.yc.zzg.test;

import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.io.Text;

public class Reduce extends DataJoinReducerBase {

    @Override
    protected TaggedMapOutput combine(Object[] tags, Object[] values) {
         if(tags.length<2)return null;
          StringBuffer joinData = new StringBuffer();
          int count=0;

            for(Object value: values){
                joinData.append(",");
                TaggedWritable tw = (TaggedWritable)value;
                String recordLine = ((Text)tw.getData()).toString();
                String[] tokens = recordLine.split(",",2);
                if(count==0) joinData.append(tokens[0]);
                joinData.append(tokens[1]);
            }

            TaggedWritable rtv = new TaggedWritable(new Text(new String(joinData)));
            rtv.setTag((Text)tags[0]);
            return rtv;
    }
}

Drive.java

package com.yc.zzg.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Drive {

    public static void main(String[] args) throws Exception {
         Configuration conf = new Configuration();   
            JobConf job = new JobConf(conf, Drive.class);  

            Path in = new Path("hdfs://localhost:9000/input/inputtest/*");
            Path out = new Path("hdfs://localhost:9000/output/test20");
            FileInputFormat.setInputPaths(job, in);  
            FileOutputFormat.setOutputPath(job, out);  
            job.setJobName("DataJoin");  
            job.setMapperClass(MapperClass.class);  
            job.setReducerClass(Reduce.class);  

            job.setInputFormat(TextInputFormat.class);  
            job.setOutputFormat(TextOutputFormat.class);  
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(TaggedWritable.class);  
            job.set("mapred.textoutputformat.separator", ",");  
            JobClient.runJob(job); 
    }
}

Customers.txt

1,Stephanie Leung,555-555-5555      
2,Edward Kim,123-456-7890         
3,Jose Madriz,281-330-8004         
4,David Stork,408-555-0000          

Orders.txt

 3,A,12.95,02-Jun-2008
 1,B,88.25,20-May-2008
 2,C,32.00,30-Nov-2007
 3,D,25.02,22-Jan-2009

所碰到问题有几点,提出来和大家分析一下
1。第一个问题是DataJoinMapperBase包的问题,前面已经解决了
2。第二个问题是原来的程序会报一个

java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.yc.zzg.test.TaggedWritable.<init>()


Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.yc.zzg.test.TaggedWritable.<init>()

所以你需要给一个构造方法

 public TaggedWritable() {  
    }  

3。第三个问题是我有两个多个文件怎么导入,你将多个文件放入同一个文件夹里然后用

   Path in = new Path("hdfs://localhost:9000/input/inputtest/*");

就可以导入多个文件啦,同理也可以拼file*.txt之类的

4。有的时候我为了测试一个工程,从test1测试到了test20,为了方便我们输出的时候总是要创建一个新的目录,解决方案如下

1。hadoop需要把集群上的core-site.xml和hdfs-site.xml放到当前工程下。eclipse工作目录的bin文件夹下面

2。    FileSystem fs=FileSystem.get(conf); 
            if(fs.exists(out)){  
                fs.delete(out, true);  
                System.out.println("输出路径存在,已删除!");  
            }  

版权声明:本文为博主原创文章,未经博主允许不得转载。

Reduce侧联接

标签:datajoinma   reduce侧联接   

原文地址:http://blog.csdn.net/ac_great/article/details/48010355

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!