对于不同文件中的数据,有时候有对应关系,需要进行连接(join),获得一个新的文件以便进行分析。比如有两个输入文件a.txt,b.txt,其中的数据格式分别如下
1 a 2 b 3 c 4 d
1 good 2 bad 3 ok 4 hello
a good b bad c ok d hello
1.map阶段,将两个输入文件中的数据进行打散,如下:
1 a 1 good 2 b 2 bad 3 c 3 ok 4 d 4 hello
package cn.zhf.hadoop; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SingleJoin extends Configured implements Tool{ public static void main(String[] args) throws Exception { Tool tool = new SingleJoin(); ToolRunner.run(tool, args); print(tool); } @Override public int run(String[] arg0) throws Exception { Configuration conf = getConf(); Job job = new Job(); job.setJarByClass(getClass()); FileSystem fs = FileSystem.get(conf); fs.delete(new Path("out"),true); FileInputFormat.addInputPath(job, new Path("a.txt")); FileInputFormat.addInputPath(job, new Path("b.txt")); FileOutputFormat.setOutputPath(job,new Path("out")); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return 0; } public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{ public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String[] str = value.toString().split(" "); context.write(new Text(str[0]), new Text(str[1])); } } public static class JoinReducer extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ Iterator<Text> iterator = values.iterator(); Text keyy = new Text(); Text valuee = new Text(); while(iterator.hasNext()){ Text temp = iterator.next(); if(temp.toString().length() == 1){ keyy.set(temp); valuee.set(iterator.next()); }else{ valuee.set(temp); keyy.set(iterator.next()); } } context.write(keyy, valuee); } } public static void print(Tool tool) throws IOException{ FileSystem fs = FileSystem.get(tool.getConf()); Path path = new Path("out/part-r-00000"); FSDataInputStream fsin = fs.open(path); int length = 0; byte[] buff = new byte[128]; while((length = fsin.read(buff,0,128)) != -1) System.out.println(new String(buff,0,length)); } }
原文地址:http://blog.csdn.net/ozhaohuafei/article/details/27812793