一个MapReduce任务很可能访问和处理两个甚至多个数据集,在关系型数据库中,这将是两个或者多个表的连接,但是Hadoop系统没有关系型数据库中那样强大的连接处理功能,因此处理复杂一些。一般来讲,hadoop可以采用这几种数据连接方式:
1采用DataJoin类库实现Reduce端连接的方法
2 用全局文件复制实现Map端连接方法
3 带Map端过滤的Reduce端连接方法
Hadoop的Mapreduce框架提供了一种较为通用 的多数据源连接方法,该方法用DataJoin类库为程序员提供了完成数据连接所需的编程框架和接口,其处理方法如下:
为了完成不用数据源的连接操作,我们必须给每个数据源制定一个标签(tag),用来区分数据,就像关系型数据库中表名一样,这里我们需要实现 Text generateInputTag(String inputFile)方法;
另外,为了进行连接操作,我们必须知道连接的主键是什么,类似于关系型数据库中的key,因此我们需要指定groupKey,这里我们需要实现 Text generateGroupKey(TaggedMapOutput aRecord)
然后在Map端我们需要把原始数据包装成为一个带标签的数据记录,方便shuffle和Reduce端执行笛卡尔积,所以我们需要实现 TaggedMapOutput generateTaggedMapOutput(Object value);
总结一下Map处理过程:
Datajoin类库首先提哦功能管理一个抽象基类DataJoinMapperBase,该基类实现了map()方法,帮助程序员对每个数据源下的记录生成一个代标签的数据记录对象。Map端处理过程中,需要指定标签tag和Groupkey,然后包装成为带标签的数据记录对象,在shuffle过程中,这些GroupKey相同的记录被分到同一个Reduce节点上。
Reduce处理过程:
Reduce节点收到这些带标签的数据记录后,Reduce过程将这些带不同的数据源标签的记录执行笛卡尔积,自动生成所有不同的叉积组合,由程序员实现一个combine()方法,根据应用程序需求将这些具有相同的Groupkey的数据记录进行适当的合并处理,以此完成类似于关系型数据库中不同实体数据记录之间的连接。
在Reduce阶段我们需要继承DataJoinReduceBase,该基类实现了reduce()方法,我们只是需要实现combine()方法即可,另外我们还是需要继承TaggedMapOutput类,它描述了一个标签化的数据记录,实现了getTag(),setTag()方法,作为Mapper的key_value输出value类型,由于需要I/O,我们需要继承并且实现Writable接口,并且实现getData()方法用以读取记录数据
下面是数据源:
user.txt文件:
1,张三,135xxxxxxxx
2,李四,136xxxxxxxx
3,王五,137xxxxxxxx
4,赵六,138xxxxxxxx
order.txt文件:
3,A,13,2013-02-12
1,B,23,2013-02-14
2,C,16,2013-02-17
3,D,25,2013-03-12
这其中需要注意很多小细节,因为没有要求程序员实现Map和reduce方法,所以我们会很容易忽略很多东西,需要注意的东西我在下面一一注释了:
我们必须使用Jobconf 来声明一个job,同时使用JobClient来run job,另外我们在继承TaggedMapOutput的时候默认的无参构造方法中需要初始化data
package joinTest; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class DataJoin{ public static class MyTaggedMapOutput extends TaggedMapOutput{ private Writable data; public MyTaggedMapOutput(){ //一定要new一下,不然反序列化时会报空指针异常 this.data=new Text(); } public MyTaggedMapOutput(Writable data){ this.tag=new Text(""); this.data=data; } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); this.data.readFields(in); } public void write(DataOutput output) throws IOException { this.tag.write(output); //this.tag.write(output); //大问题,粗心写成了this.tag.write(output); 结果一直报错 this.data.write(output); } @Override public Writable getData() { return data; } } public static class DataJoinMapper extends DataJoinMapperBase{ protected Text generateInputTag(String inputFile){ //String datasource=inputFile.substring(inputFile.lastIndexOf("/")+1).split("\\.")[0]; String datasource = inputFile.split("-")[0]; System.out.println("datasource:"+datasource); return new Text(datasource); } protected TaggedMapOutput generateTaggedMapOutput(Object value){ TaggedMapOutput tm=new MyTaggedMapOutput((Text)value); tm.setTag(this.inputTag); return tm; } @Override protected Text generateGroupKey(TaggedMapOutput aRecord) { String line=aRecord.getData().toString(); //String groupkey=line.split("\\s")[0]; String groupkey=line.split(",")[0]; return new Text(groupkey); } } public static class DataJoinReducer extends DataJoinReducerBase{ @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { if(tags.length<2){ return null; } String output = ""; /* for(int i=0;i<tags.length;i++){ TaggedMapOutput tat=(MyTaggedMapOutput)values[i];System.out.println("tags:"+tags[i]+" values:"+tat.getData().toString()); if(i==0){ output=tat.getData().toString();//System.out.println("i==0 output:"+output); }else{ output+="\t"; String [] s=tat.getData().toString().split("\\s",2); System.out.println("s.length:"+s.length); output+=s[0]; }*/ for(int j=0;j<tags.length;j++){ //TaggedMapOutput taOutput=(TaggedMapOutput)tags[j]; TaggedMapOutput taggedMapOutput=(TaggedMapOutput)values[j]; System.out.println("tag:"+taggedMapOutput.getTag()+" value:"+taggedMapOutput.getData().toString()); } for(int i=0;i<values.length;i++){ TaggedMapOutput tat=(MyTaggedMapOutput)values[i]; String recordLine=((Text)tat.getData()).toString(); String [] tokens=recordLine.split(",",2);System.out.println("data:"+recordLine); if(i>0) output+=","; output+=tokens[1]; } TaggedMapOutput tag=new MyTaggedMapOutput(new Text(output)); tag.setTag((Text)tags[0]); return tag; } } /* * 这里一定要注意,FileInputFormat和FileOutputFormat一定要是org.apache.hadoop.mapred下面的包 */ public static int run(String args[]) throws IOException{ Configuration conf=new Configuration(); //Configuration conf=getConf(); JobConf job=new JobConf(conf, DataJoin.class); //Job job=new Job(conf,"DataJoin"); job.setJobName("DataJoin"); // job.setJarByClass(DataJoin.class); job.setMapperClass(DataJoinMapper.class); job.setReducerClass(DataJoinReducer.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MyTaggedMapOutput.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //job.set("mapred.textoutputformat.separator", "\t"); job.set("mapred.textoutputformat.separator", ","); //FileInputFormat.addInputPath(job, new Path("/home/hadoop/test/mapReduce/DataJoinTest2")); //FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.1:9000/user/hadoop/DataJoinTest2")); //FileInputFormat.addInputPaths(job, "/home/hadoop/test/DataJoinTest/province.txt"); //MultipleInputs.addInputPath(job, new Path("/home/hadoop/test/DataJoinTest/province.txt"), TextInputFormat.class); FileInputFormat.addInputPaths(job, args[0]); //FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.1:9000/user/hadoop/DataJoinTest2_Out")); //FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/test/mapReduce/DataJoinTest2_result")); FileOutputFormat.setOutputPath(job, new Path(args[1])); JobClient.runJob(job); return 0; } public static void main(String [] args) throws Exception{ String[] arg = { "/home/hadoop/test/mapReduce/DataJoinTest2", "/home/hadoop/test/mapReduce/DataJoinTest2_result" }; int res=run(arg); System.exit(res); } }
原文地址:http://blog.csdn.net/weiweiyixiaocsdn/article/details/45457461