码迷,mamicode.com
首页 > 其他好文 > 详细

MR中简单实现自定义的输入输出格式

时间:2014-09-09 11:29:28      阅读:282      评论:0      收藏:0      [点我收藏+]

标签:blog   os   io   java   ar   for   div   cti   sp   

import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestCombine extends Configured implements Tool {
	private static class ProvinceMapper extends
			Mapper<Object, Text, Text, Text> {
		@Override
		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			System.out.println("value : " + value + " Context " + context);
			context.write(value, value);
		}
	}

	private static class ProvinceReducer extends
			Reducer<Text, Text, Text, Text> {
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			for (Text va : values) {
			    System.out.println("reduce " + key);
				context.write(key, key);
			}
		}
	}
	
	 // 输入格式
     static class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {  
	    @SuppressWarnings({ "unchecked", "rawtypes" })  
	    @Override  
	    public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  
	        return new CombineFileRecordReader((CombineFileSplit)split, context, CombineLineRecordReader.class);  
	    }  
	}  
	
	 static class CombineLineRecordReader<K, V> extends RecordReader<K, V> {  
	    private CombineFileSplit split;  
	    private TaskAttemptContext context;  
	    private int index;  
	    private RecordReader<K, V> rr;  
	  
	    @SuppressWarnings("unchecked")  
	    public CombineLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {  
	        this.index = index;
	        this.split = (CombineFileSplit) split;  
	        this.context = context;  
	  
	        this.rr = (RecordReader<K, V>) ReflectionUtils.newInstance(LineRecordReader.class, context.getConfiguration());  
	    }  
	  
	    @SuppressWarnings("unchecked")  
	    @Override  
	    public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {  
	        this.split = (CombineFileSplit) curSplit;  
	        this.context = curContext;  
	  
	        if (null == rr) {  
	            rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  
	        }  
	  
	        FileSplit fileSplit = new FileSplit(this.split.getPath(index),  
	                this.split.getOffset(index), this.split.getLength(index),  
	                this.split.getLocations());  
	          
	        this.rr.initialize(fileSplit, this.context);  
	    }  
	  
	    @Override  
	    public float getProgress() throws IOException, InterruptedException {  
	        return rr.getProgress();  
	    }  
	  
	    @Override  
	    public void close() throws IOException {  
	        if (null != rr) {  
	            rr.close();  
	            rr = null;  
	        }  
	    }  
	  
	    @Override  
	    public K getCurrentKey()  
	    throws IOException, InterruptedException {  
	        return rr.getCurrentKey();  
	    }  
	  
	    @Override  
	    public V getCurrentValue()  
	    throws IOException, InterruptedException {  
	        return rr.getCurrentValue();  
	    }  
	  
	    @Override  
	    public boolean nextKeyValue() throws IOException, InterruptedException {  
	        return rr.nextKeyValue();  
	    }  
	}  
	
	// 输出格式
	 static class MyOutputFormat extends FileOutputFormat<Text, Text>{
		@Override
		public RecordWriter<Text, Text> getRecordWriter(
				TaskAttemptContext job) throws IOException, InterruptedException {
			return new MyRecordWriter(job);
		}
	}

	  public static class  MyRecordWriter extends RecordWriter<Text, Text> {		
		private Map<String, FSDataOutputStream> outputMap = null;
		private static final String LINESEPARATOR = "\n";
		private FileSystem fs;
		private JobContext job;
		
		public MyRecordWriter(JobContext job) throws IOException {
			this.outputMap = new HashMap<String, FSDataOutputStream>();
			this.job = job;
			this.fs = FileSystem.get(job.getConfiguration());
		}
		
		// 参考 MultipleOutputs
		public void write(Text key, Text value) throws IOException {
			String k = key.toString();
			if(k.isEmpty())
				return;
			FSDataOutputStream out = outputMap.get(k);
			if(out==null) {
				if(k.isEmpty())
					System.out.println(value.toString());
				Path outputPath = new Path(FileOutputFormat.getOutputPath(job), k);
				if(!fs.exists(outputPath))
					out = fs.create(outputPath);
				else
					return;
				outputMap.put(k, out);
			}
			out.write(value.getBytes());
			out.write(LINESEPARATOR.getBytes());
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			for(FSDataOutputStream out : outputMap.values()) {
				out.close();
			}
		}
	}

	
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		Job job = new Job(conf);
		job.setJobName("TestCombine");
		job.setJarByClass(TestCombine.class);

		job.setMapperClass(ProvinceMapper.class);
		job.setReducerClass(ProvinceReducer.class);
		
		//job.setInputFormatClass(CombineSequenceFileInputFormat.class);
		job.setOutputFormatClass(MyOutputFormat.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		String inpath = "/home/hadoop/tmp/combine";
		String outpath = "/home/hadoop/tmp/combineout";
		Path p = new Path(outpath);
		
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(p)){
			fs.delete(p);
		}
		FileInputFormat.addInputPaths(job, inpath);
		FileOutputFormat.setOutputPath(job, p);

		return job.waitForCompletion(true) ? 0 : 1;
	} 

	public static void main(String[] args) throws Exception {
		int ret = ToolRunner.run(new TestCombine(), args);
		System.exit(ret);
	} 
} 

 

MR中简单实现自定义的输入输出格式

标签:blog   os   io   java   ar   for   div   cti   sp   

原文地址:http://www.cnblogs.com/chengxin1982/p/3961679.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!