标签:
1、项目名称:
package com.mtjoin; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MTjoin { public static int time = 0; public static class Map extends Mapper<Object, Text, Text, Text>{ public void map(Object key, Text value, Context context)throws IOException,InterruptedException{ System.out.println("mapper........................"); String line = value.toString(); if(line.contains("factoryname")==true || line.contains("addressID")== true){ return ; } int i = 0; while(line.charAt(i) >= ‘9‘|| line.charAt(i) <= ‘0‘){ i++; } if(line.charAt(0) >= ‘9‘|| line.charAt(0) <= ‘0‘){ int j = i-1; while(line.charAt(j) != ‘ ‘) j--; System.out.println("key:"+line.substring(i)+" value:"+line.substring(0,j)); String values[] = {line.substring(0, j),line.substring(i)}; context.write(new Text(values[1]), new Text("1+"+values[0])); } else { int j = i + 1; while(line.charAt(j)!=‘ ‘) j++; System.out.println("key:"+line.substring(0, i+1)+" value:"+line.substring(j)); String values[] ={line.substring(0,i+1),line.substring(j)}; context.write(new Text(values[0]), new Text("2+"+values[1])); } } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{ System.out.println("reducer........................"); if( time == 0){ context.write(new Text("factoryname"), new Text("addressname")); time++; } int factorynum = 0; String factory[] = new String[10]; int addressnum = 0; String address[] = new String[10]; Iterator ite = values.iterator(); while(ite.hasNext()){ String record = ite.next().toString(); char type = record.charAt(0); if(type == ‘1‘){ factory[factorynum] = record.substring(2); factorynum++; } else{ address[addressnum] = record.substring(2); addressnum++; } } if(factorynum != 0 && addressnum != 0){ for(int m = 0 ; m < factorynum ; m++){ for(int n = 0; n < addressnum; n++){ context.write(new Text(factory[m]), new Text(address[n])); System.out.println("factoryname:"+factory[m]+" addressname:"+address[n]); } } } } } public static void main(String [] args)throws Exception{ Configuration conf = new Configuration(); String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage:MTjoin<in><out>"); System.exit(2); } Job job = new Job(conf,"multiple table join"); job.setJarByClass(MTjoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0:1); } }
版本二(简化版):
package com.mtjoin; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class MTjoin { public static int time = 0; public static class Map extends Mapper<Object, Text, Text, Text>{ public void map(Object key, Text value, Context context)throws IOException,InterruptedException{ System.out.println("mapper........................"); String line = value.toString(); if(line.contains("factoryname")==true || line.contains("addressID")== true){ return ; } int len = line.length(); if(line.charAt(0) > ‘9‘|| line.charAt(0) < ‘0‘){ System.out.println("key:"+line.substring(len-1)+" value:"+line.substring(0,len-2)); String values[] = {line.substring(0, len-2),line.substring(len-1)}; context.write(new Text(values[1]), new Text("1+"+values[0])); } else { System.out.println("key:"+line.substring(0, 1)+" value:"+line.substring(2)); String values[] ={line.substring(0,1),line.substring(2)}; context.write(new Text(values[0]), new Text("2+"+values[1])); } } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ public void reduce(Text key, Iterable<Text> values, Context context)throws IOException,InterruptedException{ System.out.println("reducer........................"); if( time == 0){ context.write(new Text("factoryname"), new Text("addressname")); time++; } int factorynum = 0; String factory[] = new String[10]; int addressnum = 0; String address[] = new String[10]; Iterator ite = values.iterator(); while(ite.hasNext()){ String record = ite.next().toString(); char type = record.charAt(0); if(type == ‘1‘){ factory[factorynum] = record.substring(2); factorynum++; } else{ address[addressnum] = record.substring(2); addressnum++; } } if(factorynum != 0 && addressnum != 0){ for(int m = 0 ; m < factorynum ; m++){ for(int n = 0; n < addressnum; n++){ context.write(new Text(factory[m]), new Text(address[n])); System.out.println("factoryname:"+factory[m]+" addressname:"+address[n]); } } } } } public static void main(String [] args)throws Exception{ Configuration conf = new Configuration(); String otherArgs[] = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage:MTjoin<in><out>"); System.exit(2); } Job job = new Job(conf,"multiple table join"); job.setJarByClass(MTjoin.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0:1); } }
标签:
原文地址:http://www.cnblogs.com/yangyquin/p/5021190.html