码迷,mamicode.com
首页 > 其他好文 > 详细

数据输入输出格式

时间:2015-08-15 13:14:57      阅读:291      评论:0      收藏:0      [点我收藏+]

标签:

数据输入格式

数据输入格式(InputFormat)用于描述MR作业的输入规范,主要功能:输入规范检查(比如输入文件目录的检查)、对数据文件进行输入切分和从输入分块中将数据记录逐一读取出来、并转化为Map的输入键值对

Hadoop中最常用的数据输入格式包括:TextInputFormat 和 KeyValueInputFormat。

1)TextInputFormat 是系统默认的数据输入格式,可以将文件的每一行解析成一个键值对。其中,Key是当前行在整个文件中的字节偏移量,而Value就是该行的内容。默认的RecordReaderLineRecordReader

2)KeyValueInputFormat是将一个按照<key,value>格式存放的文本文件逐行读出,并自动解析生成相应的keyvalue默认是KeyValueLineRecordReader 

定制数据输入格式 

用户可以从基类InputFormatRecordReader开始定制过程,主要实现InputFormat中的createRecordReader()和getSplits()两个抽象方法,而RecordReader中则需要实现gerCurrentKey()和getCurrentValue()几个抽象方法。

需求:为了能更细粒的记录每个单词在文档中出现时的行位置信息FileName@LineOffset。 

  • 方法一:使用默认的TextInputFormat和LineRecordReader 
技术分享
public static class IIMapper extends Mapper<Text, Text, Text, Text>{
        @Override
        //输出key:word         输出value:FileName@LineOffset
        protected void map(Text key, Text value,Context context)
                throws IOException, InterruptedException {
            
            //得到输入文件的文件名FileName
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String name = fileSplit.getPath().getName();
            
            //组装拼接Value: FileName@LineOffset
            Text fileName_lineOffset=new Text(name+"@"+key.toString());
            
            String[] splited = value.toString().split("\t");            
            for(String word : splited){
                context.write(new Text(word), fileName_lineOffset);
            }
   }
   }    
View Code
  • 方法二:定制FileNameInputFormatFileNameRecordReader。本例是基于已有的TextInputFormatLineRecordReader两个类来完成的。 
技术分享
package invertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class FileNameRecordReader extends RecordReader<Text, Text> {
    
    //成员变量
    String fileName;    
    //实例化一个LineRecordReader实例
    LineRecordReader lrr=new LineRecordReader();
    
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        
        //调用LineRecordReader类的初始化方法
        lrr.initialize(split, context);
        
        //获取当前InputSplit的文件名
        fileName=((FileSplit)split).getPath().getName();
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        
        //调用LineRecordReader类的方法,拼接key
        //其中lrr.getCurrentKey()返回:当前行在整个文本文件中的字节偏移量
        return new Text("("+fileName+"@"+lrr.getCurrentKey().toString()+")");
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        
        //调用LineRecordReader类的方法
        return lrr.getCurrentValue();    
}    

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        
        return lrr.nextKeyValue();
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        
        return lrr.getProgress();
    }

    @Override
    public void close() throws IOException {
        
        lrr.close();
    }
}

package invertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class FileNameInputFormat extends FileInputFormat<Text, Text>{

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException, InterruptedException {
        
        FileNameRecordReader fnrr = new FileNameRecordReader();
        
        //调用FileNameRecordReader的初始化方法
        fnrr.initialize(split, context);
        
        return fnrr;
    }    
}
View Code
  •  使用自定义的FileNameInputFormat类和FileNameRcordReader: 
技术分享
package invertedIndex;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndex {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(InvertedIndex.class);
        
        //设置数据输入格式【使用自定义的InputFormat】
        job.setInputFormatClass(FileNameInputFormat.class);
        
        job.setMapperClass(FFMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.waitForCompletion(true);        
    }
    
    public static class FFMapper extends Mapper<Text, Text, Text, Text>{
        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            
            //分词
            StringTokenizer st = new StringTokenizer(value.toString());
            for(;st.hasMoreTokens();){
                
                //key:单词word        value:FileName+偏移量
                context.write(new Text(st.nextToken()), key);
            }
        }
    }
}
View Code

 输出结果为:key:单词,valueFileName@偏移量

read (data1@0)

file (data1@0)

read (data1@11)

data (data1@11)  

数据输出格式 

数据输出格式(OutputFormat)用于描述MR作业的数据输出规范。主要功能:输出规范检查(如检查输出目录是否存在),以及提供作业结果数据输出功能。 

Hadoop默认的数据输出格式是TextOutputFormat,可以将结果以key+\t+value的形式逐行输出。默认的RecordWriterLineRecordWriter。 

数据输入输出格式

标签:

原文地址:http://www.cnblogs.com/skyl/p/4732290.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!