码迷,mamicode.com
首页 > 其他好文 > 详细

Hadoop 自连接

时间:2015-01-02 09:50:09      阅读:224      评论:0      收藏:0      [点我收藏+]

标签:mapreduce 原理

                                           Hadoop自连接

实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。

    child        parent 


Tom        Lucy


Tom        Jack 


Jone        Lucy


Jone        Jack


Lucy        Mary


Lucy        Ben


Jack        Alice


Jack        Jesse


Terry        Alice


Terry        Jesse


Philip        Terry


Philip        Alma


Mark        Terry


Mark        Alma



结果输出为:

Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse


代码为:


package com.hadoop.twelve;


import java.io.IOException;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class SelfJoinMapper extends Mapper<LongWritable, Text, Text, Text> {


@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[]arr =line.split("\t");
if(arr.length==2){
context.write(new Text(arr[1]), new Text("0_"+arr[0]));//left表
context.write(new Text(arr[0]), new Text("1_"+arr[1]));//right表
}
}


}



package com.hadoop.twelve;


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class SelfJoinReducer extends Reducer<Text, Text, Text, Text> {


@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
List<String> grandchild = new ArrayList<String>();
List<String> grandparent = new ArrayList<String>();
for (Text temp : values) {
String[] arr =temp.toString().split("_");
if("0".equals(arr[0])){
grandchild.add(arr[1]);
}else if("1".equals(arr[0])){
grandparent.add(arr[1]);
}
}

              //这里我也不太明白笛卡尔积就完成了???????求指点!!
for(String gc:grandchild){
for(String gp:grandparent){
context.write(new Text(gc), new Text(gp));
}
}


}

    //test测试
public static void main(String[] args) {
List<String> grandchild = new ArrayList<String>();
List<String> grandparent = new ArrayList<String>();
grandchild.add("a");
grandchild.add("b");
grandchild.add("c");
grandchild.add("d");

grandparent.add("a1");
grandparent.add("b1");
grandparent.add("c1");
grandparent.add("d1");

for(String gc:grandchild){
for(String gp:grandparent){
System.out.println(gc+"-------->"+gp);
}
}
}

}


package com.hadoop.twelve;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class JobMain {


/**
* @param args
*/
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration,"self_join_job");
job.setJarByClass(JobMain.class);

job.setMapperClass(SelfJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);


job.setReducerClass(SelfJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);


FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);

System.exit(job.waitForCompletion(true)?0:1);


}


}


Hadoop 自连接

标签:mapreduce 原理

原文地址:http://blog.csdn.net/u010220089/article/details/42320929

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!