码迷,mamicode.com
首页 > 其他好文 > 详细

hadoop 自定义OutputFormat

时间:2019-08-31 23:27:18      阅读:65      评论:0      收藏:0      [点我收藏+]

标签:color   cte   main   void   循环   存储   stream   组件   turn   

1、继承FileOutputFormat,复写getRecordWriter方法

/**
 * @Description:自定义outputFormat,输出数据到不同的文件
 */
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new FRecordWriter(job);
    }
}

2、实现RecordWriter

/**
 * @Description: 继承RecordWriter,实现数据输出到不同目录文件
 */
public class FRecordWriter extends RecordWriter<Text, NullWritable> {
    FSDataOutputStream out1 = null;
    FSDataOutputStream out2 = null;

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        // 判断是否包含“baidu”和"alibaba"字符串,输出到不同文件
        if (key.toString().contains("baidu") || key.toString().contains("alibaba")) {
            out1.write(key.toString().getBytes());
        } else {
            out2.write(key.toString().getBytes());
        }

    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(out1);
        IOUtils.closeStream(out2);
    }

    public FRecordWriter(TaskAttemptContext job) {
        FileSystem fs;
        try {
            Path path1 = new Path("output1/a.log");
            Path path2 = new Path("output2/b.log");
            System.out.println(path1.getName());
            System.out.println(path2.getName());
            fs = FileSystem.get(job.getConfiguration());
            out1 = fs.create(path1);
            out2 = fs.create(path2);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

3、map

/**
 * @Description: 按行读取,按行写入
 */
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

4、reducer

public class FilterReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    private Text newLine = new Text();
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        //循环null值的values是防止key里有重复的数据没有被取出
        //Iterable<NullWritable> values迭代器里存储了key和value(虽然本例中value都是null值)
        //通过循环迭代器,迭代器里的key值也会被不断取出赋值到Text key中(公用内存地址)
        for (NullWritable value : values) {
            newLine.set(key.toString()+"\r\n");
            context.write(newLine,value);
        }
    }
}

5、driver

/**
 * @Description: 自定义输出 
 * 实现对样本按行分割,判断是否包含baidu或alibaba字符串,
 * 包含则写入目录1,不包含写入目录2,
 */
public class FilterDriver {

   public static void main(String args[]) throws Exception{
       if(args.length!=2)
       {
           System.err.println("使用格式:FilterDriver <input path> <output path>");
           System.exit(-1);
       }


       Configuration conf = new Configuration();
       Job job = Job.getInstance(conf);

       job.setJarByClass(FilterDriver.class);
       job.setMapperClass(FilterMapper.class);
       job.setReducerClass(FilterReducer.class);

       job.setMapOutputKeyClass(Text .class);
       job.setMapOutputValueClass(NullWritable .class);

       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(NullWritable.class);

       // 要将自定义的输出格式组件设置到job中
       job.setOutputFormatClass(FilterOutputFormat.class);

       FileInputFormat.setInputPaths(job, new Path(args[0]));

       // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
       // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
       FileOutputFormat.setOutputPath(job, new Path(args[1]));

       Path outPath = new Path(args[1]);
       FileSystem fs = FileSystem.get(conf);
       if(fs.exists(outPath)){
           fs.delete(outPath,true);
       }

       boolean result = job.waitForCompletion(true);
       System.exit(result ? 0 : 1);
   }


}

 

 

hadoop 自定义OutputFormat

标签:color   cte   main   void   循环   存储   stream   组件   turn   

原文地址:https://www.cnblogs.com/asker009/p/11440866.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!