联接两张表
Table EMP:(新建文件EMP,第一行属性名不要)
Name Sex Age DepNo
zhang male 20 1
li female 25 2
wang female 30 3
zhou male 35 2
Table Dep:(新建文件DEP,第一行属性名不要)
DepNo DepName
1 Sales
2 Dev
3 Mgt
Inner join:
select Name,Sex,Age,DepName from EMP inner join DEP on EMP.DepNo=DEP.DepNo
Result:
Name Sex Age DepName
zhang male 20 Sales
li female 25 Dev
wang female 30 Mgt
zhou male 35 Dev
接下来使用MapReduce实进行Join操作。
reduce端联接比map端联接更普遍,因为输入的数据不需要特定的结构;效率低,因为所有数据必须经过shuffle过程,但是编写简单。
基本思路:
1、Map端读取所有文件,并在输出的内容里加上标识代表数据是从哪个文件里来的;
2、在reduce处理函数里,按照标识对数据进行保存;
3、然后根据Key的Join来求出结果直接输出;
package Join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class EmpJoinDep implements WritableComparable{
private String Name="";
private String Sex="";
private int Age=0;
private int DepNo=0;
private String DepName="";
private String table="";
public EmpJoinDep() {}
public EmpJoinDep(EmpJoinDep empJoinDep) {
this.Name = empJoinDep.getName();
this.Sex = empJoinDep.getSex();
this.Age = empJoinDep.getAge();
this.DepNo = empJoinDep.getDepNo();
this.DepName = empJoinDep.getDepName();
this.table = empJoinDep.getTable();
}
public String getName() {
return Name;
}
public void setName(String name) {
Name = name;
}
public String getSex() {
return Sex;
}
public void setSex(String sex) {
this.Sex = sex;
}
public int getAge() {
return Age;
}
public void setAge(int age) {
this.Age = age;
}
public int getDepNo() {
return DepNo;
}
public void setDepNo(int depNo) {
DepNo = depNo;
}
public String getDepName() {
return DepName;
}
public void setDepName(String depName) {
DepName = depName;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(Name);
out.writeUTF(Sex);
out.writeInt(Age);
out.writeInt(DepNo);
out.writeUTF(DepName);
out.writeUTF(table);
}
@Override
public void readFields(DataInput in) throws IOException {
this.Name = in.readUTF();
this.Sex = in.readUTF();
this.Age = in.readInt();
this.DepNo = in.readInt();
this.DepName = in.readUTF();
this.table = in.readUTF();
}
//不做任何排序
@Override
public int compareTo(Object o) {
return 0;
}
@Override
public String toString() {
return "EmpJoinDep [Name=" + Name + ", Sex=" + Sex + ", Age=" + Age
+ ", DepName=" + DepName + "]";
}
}
package Join;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
private final static String INPUT_PATH = "hdfs://liguodong:8020/inputjoin";
private final static String OUTPUT_PATH = "hdfs://liguodong:8020/outputmapjoin";
public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, EmpJoinDep>{
private EmpJoinDep empJoinDep = new EmpJoinDep();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] values = value.toString().split("\\s+");
if(values.length==4){
empJoinDep.setName(values[0]);
empJoinDep.setSex(values[1]);
empJoinDep.setAge(Integer.parseInt(values[2]));
empJoinDep.setDepNo(Integer.parseInt(values[3]));
empJoinDep.setTable("EMP");
context.write(new IntWritable(Integer.parseInt(values[3])), empJoinDep);
}
if(values.length==2){
empJoinDep.setDepNo(Integer.parseInt(values[0]));
empJoinDep.setDepName(values[1]);
empJoinDep.setTable("DEP");
context.write(new IntWritable(Integer.parseInt(values[0])), empJoinDep);
}
}
}
public static class MyReducer extends Reducer<IntWritable, EmpJoinDep, NullWritable, EmpJoinDep>{
@Override
protected void reduce(IntWritable key, Iterable<EmpJoinDep> values,
Context context)
throws IOException, InterruptedException {
String depName = "";
List<EmpJoinDep> list = new LinkedList<EmpJoinDep>();
//1 emp
//1 dep
for (EmpJoinDep val : values) {
list.add(new EmpJoinDep(val));
//如果是部门表,如果部门编号为1,则获取该部门的名字。
if(val.getTable().equals("DEP")){
depName = val.getDepName();
}
}
//如果上面部门编号是1,则这里也是1。
for (EmpJoinDep listjoin : list) {
//如果是员工表,则需要设置员工的所属部门。
if(listjoin.getTable().equals("EMP")){
listjoin.setDepName(depName);
context.write(NullWritable.get(), listjoin);
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
if(fileSystem.exists(new Path(OUTPUT_PATH)))
{
fileSystem.delete(new Path(OUTPUT_PATH),true);
}
Job job = Job.getInstance(conf, "Reduce Join");
job.setJarByClass(ReduceJoin.class);
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(EmpJoinDep.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(EmpJoinDep.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行结果:
**上传数据:**
[root@liguodong file]# vi EMP
[root@liguodong file]# vi DEP
[root@liguodong file]# hdfs dfs -mkdir /inputjoin
[root@liguodong file]# hdfs dfs -put EMP /inputjoin/
[root@liguodong file]# hdfs dfs -put DEP /inputjoin/
[root@liguodong file]# hdfs dfs -cat /inputjoin/DEP
1 Sales
2 Dev
3 Mgt
[root@liguodong file]# hdfs dfs -cat /inputjoin/EMP
zhang male 20 1
li female 25 2
wang female 30 3
zhou male 35 2
[root@liguodong file]# hdfs dfs -cat /outputmapjoin/p*
EmpJoinDep [Name=zhang, Sex=male, Age=20, DepName=Sales]
EmpJoinDep [Name=zhou, Sex=male, Age=35, DepName=Dev]
EmpJoinDep [Name=li, Sex=female, Age=25, DepName=Dev]
EmpJoinDep [Name=wang, Sex=female, Age=30, DepName=Mgt]
原文地址:http://blog.csdn.net/scgaliguodong123_/article/details/46501855