标签:
if (!listA.isEmpty() && !listB.isEmpty()) { for (Text A : listA) { for (Text B : listB) { context.write(A, B); } } }
// For each entry in A, for (Text A : listA) { // If list B is not empty, join A and B if (!listB.isEmpty()) { for (Text B : listB) { context.write(A, B); } } else { // Else, output A by itself context.write(A, EMPTY_TEXT); } }
// For each entry in B, for (Text B : listB) { // If list A is not empty, join A and B if (!listA.isEmpty()) { for (Text A : listA) { context.write(A, B); } } else { // Else, output B by itself context.write(EMPTY_TEXT, B); } }
// If list A is not empty if (!listA.isEmpty()) { // For each entry in A for (Text A : listA) { // If list B is not empty, join A with B if (!listB.isEmpty()) { for (Text B : listB) { context.write(A, B); } } else { // Else, output A by itself context.write(A, EMPTY_TEXT); } } } else { // If list A is empty, just output B for (Text B : listB) { context.write(EMPTY_TEXT, B); } }
// If list A is empty and B is empty or vice versa if (listA.isEmpty() ^ listB.isEmpty()) { // Iterate both A and B with null values // The previous XOR check will make sure exactly one of // these lists is empty and therefore the list will be skipped for (Text A : listA) { context.write(A, EMPTY_TEXT); } for (Text B : listB) { context.write(EMPTY_TEXT, B); } }
--------------------------- username cityid -------------------------- Li lei, 1 Xiao hong, 2 Lily, 3 Lucy, 3 Daive, 4 Jake, 5 Xiao Ming, 6
--------------------------- cityid cityname -------------------------- 1, Shanghai 2, Beijing 3, Jinan 4, Guangzhou 7, Wuhan 8, Shenzhen
package com.study.hadoop.mapreduce; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceJoin { //user map public static class UserJoinMapper extends Mapper<Object, Text, Text, Text>{ private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line = value.toString(); String[] items = line.split(","); outKey.set(items[1]); outValue.set("A"+items[0]); context.write(outKey, outValue); } } //city map public static class CityJoinMapper extends Mapper<Object, Text, Text, Text>{ // TODO Auto-generated constructor stub private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line = value.toString(); String[] items = line.split(","); outKey.set(items[0]); outValue.set("B"+items[1]); context.write(outKey, outValue); } } public static class JoinReducer extends Reducer<Text, Text, Text, Text>{ // TODO Auto-generated constructor stub //Join type:{inner,leftOuter,rightOuter,fullOuter,anti} private String joinType = null; private static final Text EMPTY_VALUE = new Text(""); private List<Text> listA = new ArrayList<Text>(); private List<Text> listB = new ArrayList<Text>(); @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //获取join的类型 joinType = context.getConfiguration().get("join.type"); } @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub listA.clear(); listB.clear(); Iterator<Text> iterator = values.iterator(); while(iterator.hasNext()){ String value = iterator.next().toString(); if(value.charAt(0)=='A') listA.add(new Text(value.substring(1))); if(value.charAt(0)=='B') listB.add(new Text(value.substring(1))); } joinAndWrite(context); } private void joinAndWrite(Context context) throws IOException, InterruptedException{ //inner join if(joinType.equalsIgnoreCase("inner")){ if(!listA.isEmpty() && !listB.isEmpty()) { for (Text A : listA) for(Text B : listB){ context.write(A, B); } } } //left outer join if(joinType.equalsIgnoreCase("leftouter")){ if(!listA.isEmpty()){ for (Text A : listA){ if(!listB.isEmpty()){ for(Text B: listB){ context.write(A, B); } } else{ context.write(A, EMPTY_VALUE); } } } } //right outer join else if(joinType.equalsIgnoreCase("rightouter")){ if(!listB.isEmpty()){ for(Text B: listB){ if(!listA.isEmpty()){ for(Text A: listA) context.write(A, B); }else { context.write(EMPTY_VALUE, B); } } } } //full outer join else if(joinType.equalsIgnoreCase("fullouter")){ if(!listA.isEmpty()){ for (Text A : listA){ if(!listB.isEmpty()){ for(Text B : listB){ context.write(A, B); } }else { context.write(A, EMPTY_VALUE); } } }else{ for(Text B : listB) context.write(EMPTY_VALUE, B); } } //anti join else if(joinType.equalsIgnoreCase("anti")){ if(listA.isEmpty() ^ listB.isEmpty()){ for(Text A : listA) context.write(A, EMPTY_VALUE); for(Text B : listB) context.write(EMPTY_VALUE, B); } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 4) { System.err.println("params:<UserInDir> <CityInDir> <OutDir> <join Type>"); System.exit(1); } Job job = new Job(conf,"Reduce side join Job"); job.setJarByClass(ReduceJoin.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); MultipleInputs.addInputPath(job, new Path(otherArgs[0]), TextInputFormat.class, UserJoinMapper.class); MultipleInputs.addInputPath(job, new Path(otherArgs[1]), TextInputFormat.class, CityJoinMapper.class); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); job.getConfiguration().set("join.type", otherArgs[3]); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
标签:
原文地址:http://blog.csdn.net/chaolovejia/article/details/46438519