码迷,mamicode.com
首页 > 其他好文 > 详细

hadoop编程小技巧(7)---自己定义输出文件格式以及输出到不同文件夹

时间:2015-12-23 21:11:47      阅读:275      评论:0      收藏:0      [点我收藏+]

标签:

代码測试环境:Hadoop2.4

应用场景:当须要定制输出数据格式时能够採用此技巧,包含定制输出数据的展现形式。输出路径。输出文件名称称等。

Hadoop内置的输出文件格式有:

1)FileOutputFormat<K,V>  经常使用的父类。

2)TextOutputFormat<K,V> 默认输出字符串输出格式。

3)SequenceFileOutputFormat<K,V> 序列化文件输出;

4)MultipleOutputs<K,V> 能够把输出数据输送到不同的文件夹;

5) NullOutputFormat<K,V> 把输出输出到/dev/null中,即不输出不论什么数据。这个应用场景是在MR中进行了逻辑处理。同一时候输出文件已经在MR中进行了输出,而不须要在输出的情况;

6)LazyOutputFormat<K,V> 仅仅有在调用write方法是才会产生文件,这种话,假设没有调用write就不会产生空文件;

步骤:

相似输入数据格式,自己定义输出数据格式相同能够參考以下的步骤

1) 定义一个继承自OutputFormat的类,只是一般继承FileOutputFormat就可以;

2)实现其getRecordWriter方法,返回一个RecordWriter类型;

3)自己定义一个继承RecordWriter的类。定义其write方法。针对每一个<key,Value>写入文件数据。


实例1(改动文件默认的输出文件名称以及默认的key和value的分隔符):

输入数据:

技术分享

自己定义CustomFileOutputFormat(把默认文件名称前缀替换掉):

package fz.outputformat;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CustomOutputFormat extends FileOutputFormat<LongWritable, Text> {

	private String prefix = "custom_";
	@Override
	public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {
		// 新建一个可写入的文件
		Path outputDir = FileOutputFormat.getOutputPath(job);
//		System.out.println("outputDir.getName():"+outputDir.getName()+",otuputDir.toString():"+outputDir.toString());
		String subfix = job.getTaskAttemptID().getTaskID().toString();
		Path path = new Path(outputDir.toString()+"/"+prefix+subfix.substring(subfix.length()-5, subfix.length()));
		FSDataOutputStream fileOut = path.getFileSystem(job.getConfiguration()).create(path);
		return new CustomRecordWriter(fileOut);
	}

}
自己定义CustomWriter(指定key,value分隔符):

package fz.outputformat;

import java.io.IOException;
import java.io.PrintWriter;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class CustomRecordWriter extends RecordWriter<LongWritable, Text> {

	private PrintWriter out;
	private String separator =",";
	public CustomRecordWriter(FSDataOutputStream fileOut) {
		out = new PrintWriter(fileOut);
	}

	@Override
	public void write(LongWritable key, Text value) throws IOException,
			InterruptedException {
		out.println(key.get()+separator+value.toString());
	}

	@Override
	public void close(TaskAttemptContext context) throws IOException,
			InterruptedException {
		out.close();
	}

}

调用主类:

package fz.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class FileOutputFormatDriver extends Configured implements Tool{

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		ToolRunner.run(new Configuration(), new FileOutputFormatDriver(),args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		if(arg0.length!=3){
			System.err.println("Usage:\nfz.outputformat.FileOutputFormatDriver <in> <out> <numReducer>");
			return -1;
		}
		Configuration conf = getConf();
		
		Path in = new Path(arg0[0]);
		Path out= new Path(arg0[1]);
		boolean delete=out.getFileSystem(conf).delete(out, true);
		System.out.println("deleted "+out+"?"+delete);
		Job job = Job.getInstance(conf,"fileouttputformat test job");
		job.setJarByClass(getClass());
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(CustomOutputFormat.class);
		
		job.setMapperClass(Mapper.class);
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(Integer.parseInt(arg0[2]));
		job.setReducerClass(Reducer.class);
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		return job.waitForCompletion(true)?0:-1;
	}

}

查看输出:

技术分享技术分享

从输出结果能够看到输出格式以及文件名称确实依照预想输出了。


实例2(依据key和value值输出数据到不同文件夹):
自己定义主类(主类事实上就是改动了输出的方式而已):

package fz.multipleoutputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class FileOutputFormatDriver extends Configured implements Tool{

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		ToolRunner.run(new Configuration(), new FileOutputFormatDriver(),args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		if(arg0.length!=3){
			System.err.println("Usage:\nfz.multipleoutputformat.FileOutputFormatDriver <in> <out> <numReducer>");
			return -1;
		}
		Configuration conf = getConf();
		
		Path in = new Path(arg0[0]);
		Path out= new Path(arg0[1]);
		boolean delete=out.getFileSystem(conf).delete(out, true);
		System.out.println("deleted "+out+"?"+delete);
		Job job = Job.getInstance(conf,"fileouttputformat test job");
		job.setJarByClass(getClass());
		
		job.setInputFormatClass(TextInputFormat.class);
//		job.setOutputFormatClass(CustomOutputFormat.class);
		MultipleOutputs.addNamedOutput(job, "ignore", TextOutputFormat.class,
				LongWritable.class, Text.class);
		MultipleOutputs.addNamedOutput(job, "other", TextOutputFormat.class,
				LongWritable.class, Text.class);

		job.setMapperClass(Mapper.class);
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(Integer.parseInt(arg0[2]));
		job.setReducerClass(MultipleReducer.class);
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		return job.waitForCompletion(true)?0:-1;
	}

}
自己定义reducer(由于要依据key和value的值输出数据到不同文件夹,所以须要自己定义逻辑)

package fz.multipleoutputformat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultipleReducer extends
		Reducer<LongWritable, Text, LongWritable, Text> {
	private MultipleOutputs<LongWritable,Text> out;
	@Override
	public void setup(Context cxt){
		out = new MultipleOutputs<LongWritable,Text>(cxt);
	}
	@Override
	public void reduce(LongWritable key ,Iterable<Text> value,Context cxt)throws IOException,InterruptedException{
		for(Text v:value){
			if(v.toString().startsWith("ignore")){
//				System.out.println("ignore--------------------value:"+v);
				out.write("ignore", key, v, "ign");
			}else{
//				System.out.println("other---------------------value:"+v);
				out.write("other", key, v, "oth");
			}
		}
	}
	
	@Override
	public void cleanup(Context cxt)throws IOException,InterruptedException{
		out.close();
	}
}

查看输出:

技术分享

能够看到输出的数据确实依据value的不同值被写入了不同的文件文件夹中,可是这里相同能够看到有默认的文件生成,同一时候注意到这个文件的大小是0,这个临时还没解决。


总结:自己定义输出格式,能够定制一些特殊需求,只是一般使用Hadoop内置的输出格式就可以。这点来说其应用意义不是非常大。

只是使用Hadoop内置的MultipleOutputs能够依据数据的不同特性输出到不同的文件夹。还是非常有实际意义的。


分享。成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990


hadoop编程小技巧(7)---自己定义输出文件格式以及输出到不同文件夹

标签:

原文地址:http://www.cnblogs.com/mengfanrong/p/5071220.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!