在旧的API中使用多文件输出,只需要自定义类继承MultipleTextOutputFormat类 重写它下面的generateFileNameForKeyValue 方法即可, 直接上例子。
输入文件 内容:
目的是按照 字母开头的文件输出,并统计单词计数,输出结果为:
代码如下:
package defined; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; /** * User: XD */ public class test { static final String INPUT_PATH = "hdfs://localhost:9000/input"; static final Path OUTPUT_PATH = new Path("hdfs://localhost:9000/output"); public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> { @Override public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { final String[] splited = value.toString().split(" "); for(String val : splited){ output.collect(new Text(val), new LongWritable(1)); } } } public static class ReduceClass extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> { @Override public void reduce(Text key, Iterator<LongWritable> values, OutputCollector<Text, LongWritable> collect, Reporter arg3) throws IOException { // TODO Auto-generated method stub long sum = 0L; while(values.hasNext()){ sum += values.next().get(); } collect.collect(key, new LongWritable(sum)); } } public static class PartitionFormat extends MultipleTextOutputFormat<Text, LongWritable> { @Override protected String generateFileNameForKeyValue(Text key , LongWritable value,String name){ char c = key.toString().toLowerCase().charAt(0); if(c>='a' && c<='z'){ return c+".txt"; }else{ return "other.txt"; } } } public static void main(String[] args) throws IOException, URISyntaxException { Configuration conf = new Configuration(); JobConf job = new JobConf(conf, test.class); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH),conf); final Path outPath = OUTPUT_PATH; if(filesystem.exists(outPath)){ filesystem.delete(outPath, true); } //1.1 读取文件 位置 FileInputFormat.setInputPaths(job, INPUT_PATH); //输出文件位置 FileOutputFormat.setOutputPath(job, OUTPUT_PATH); job.setJobName("Multipleoutput"); job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(PartitionFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(1); JobClient.runJob(job); } }
输入:
还是统计输出到不同的文件。
输出结果:
结果是dest-r-00000文件下
代码:
package wordcount; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class wordcount { /** * @param args */ static final String INPUT_PATH = "hdfs://localhost:9000/input"; static final String OUTPUT_PATH = "hdfs://localhost:9000/output"; public static class Map extends Mapper<LongWritable , Text , Text , LongWritable>{ protected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ final String[] splited = value.toString().split(" "); for(String val : splited){ context.write(new Text(val), new LongWritable(1)); } } } public static class Reduce extends Reducer<Text ,LongWritable, Text , LongWritable>{ private MultipleOutputs<Text,LongWritable> mos; String dest; protected void setup(Context context){ mos = new MultipleOutputs<Text, LongWritable>(context); } protected void reduce (Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{ long sum = 0L; char c = key.toString().toLowerCase().charAt(0); for(LongWritable val : values){ sum += val.get(); } if(c>='a' && c<='z'){ mos.write("dest", key, new LongWritable(sum)); }else{ mos.write("other", key, new LongWritable(sum)); } context.write(key, new LongWritable(sum)); } protected void cleanup(Context context) throws IOException, InterruptedException{ mos.close(); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { // TODO Auto-generated method stub Configuration conf = new Configuration(); final FileSystem filesystem = FileSystem.get(new URI(INPUT_PATH),conf); final Path outPath = new Path(OUTPUT_PATH); if(filesystem.exists(outPath)){ filesystem.delete(outPath, true); } Job job = new Job(conf,wordcount.class.getSimpleName()); //1.1 读取文件 位置 FileInputFormat.setInputPaths(job, INPUT_PATH); //1.2指定的map类//1.3 map输出的key value 类型 要是和最终的输出类型是一样的 可以省略 job.setMapperClass(Map.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setJarByClass(wordcount.class); //1.3 分区 job.setPartitionerClass(HashPartitioner.class); //1.4分组 //1.5 归约 //2.1 copy 经由网络 //2.2 指定自定义的reduce类 job.setReducerClass(Reduce.class); //指定 reduce的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //2.3指定写出到什么位置 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); MultipleOutputs.addNamedOutput(job, "dest", TextOutputFormat.class, Text.class, LongWritable.class); MultipleOutputs.addNamedOutput(job, "other", TextOutputFormat.class, Text.class, LongWritable.class); //提交到jobtracker执行。 此函数还将会打印出作业执行的详细信息 job.waitForCompletion(true); } }
原文地址:http://blog.csdn.net/xd_122/article/details/41553567