标签:des style blog http color io os 使用 java
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,其执行流程如图1。这两个函数的形参是key、value对,表示函数的输入信息。
图 1
现在看一下Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型。map 函数定义如下代码1.1。
1 protected void map(KEYIN key, VALUEIN value, 2 Context context) throws IOException, InterruptedException { 3 context.write((KEYOUT) key, (VALUEOUT) value); 4 }
代码 1.1
在上面的代码中,输入参数key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次map 函数。在这里,map 函数没有处理输入的key、value,直接通过context.write(…)方法输出了,输出的key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要我们根据业务逻辑覆盖的。
再看一下Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型,和输出的key、value 类型。看一下reduce 函数定义,如下代码1.2。
1 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context 2 ) throws IOException, InterruptedException { 3 for(VALUEIN value: values) { 4 context.write((KEYOUT) key, (VALUEOUT) value); 5 } 6 }
代码 1.2
在上面代码中,reduce 函数的形参key、value 的类型是KEYIN、VALUEIN。要注意这里的value 是存在于java.lang.Iterable<VALUEIN>中的,这是一个迭代器,用于集合遍历的,意味着values 是一个集合。reduce 函数默认实现是把每个value 和对应的key,通过调用context.write(…)输出了,这里输出的类型是KEYOUT、VALUEOUT。通常我们会根据业务逻辑覆盖reduce 函数的实现。
MapReduce 运行的时候,会通过Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer 任务会接收Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS 的文件中。整个流程如图3.1
图2.1
每个Mapper 任务是一个java 进程,它会读取HDFS 中的文件,解析成很多的键值对,经过我们覆盖的map 方法处理后,转换为很多的键值对再输出。整个Mapper 任务的处理过程又可以分为以下几个阶段,如图3.2
图2.2
在图3.2中,把Mapper 任务的运行过程分为六个阶段。
第一阶段是把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。如果数据块(Block)的大小是默认值64MB,输入文件有两个,一个是32MB,一个是72MB。那么小的文件是一个输入片,大文件会分为两个数据块,那么是两个输入片。一共产生三个输入片。每一个输入片由一个Mapper 进程处理。这里的三个输入片,会有三个Mapper 进程处理。
第二阶段是对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容。
第三阶段是调用Mapper 类中的map 方法。第二阶段中解析出来的每一个键值对,调用一次map 方法。如果有1000 个键值对,就会调用1000 次map 方法。每一次调用map 方法会输出零个或者多个键值对。map具体的工作做有我们自己来决定,我们要对map函数进行覆盖,封装我们要进行的操作来实现我们最终的目的。
第四阶段是按照一定的规则对第三阶段的每个Mapper任务输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以按照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个区。分区的数量就是Reducer 任务运行的数量。默认只有一个Reducer 任务。
第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到本地的linux 文件中。
第六阶段是对数据进行归约处理,也就是reduce 处理。对于键相等的键值对才会调用一次reduce 方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的linxu 文件中。本阶段默认是没有的,需要用户自己增加这一阶段的代码。
每个Reducer 任务是一个java 进程。Reducer 任务接收Mapper 任务的输出,归约处理后写入到HDFS 中,可以分为如图2.3 所示的几个阶段
图2.3
在图3.2中,把Mapper 任务的运行过程分为四个阶段。
第一阶段是Reducer 任务会主动从Mapper 任务复制其输出的键值对。Mapper 任务可能会有很多,因此Reducer 会复制多个Mapper 的输出。
第二阶段是把复制到Reducer 本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
第三阶段是对排序后的键值对调用reduce 方法。键相等的键值对调用一次reduce 方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS 文件中。
在整个MapReduce 程序的执行过程中如图2.4,我可以根据上面的讲解来分析下面MapReducer执行过程,从下图可知每个Mapper任务分了两个区,因此会有两个Reducer任务,最终产生两个HDFS副本。
图 2.4
在对Mapper 任务、Reducer 任务的分析过程中,会看到很多阶段都出现了键值对,为避免混淆,所以我在这里对键值对进行编号,方便大家理解键值对的变化情况。如图2.5
图 2.5
在图2.5 中,对于Mapper 任务输入的键值对,定义为key1 和value1。在map 方法中处理后,输出的键值对,定义为key2 和value2。reduce 方法接收key2 和value2,处理后,输出key3 和value3。在下文讨论键值对时,可能把key1 和value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和value3 简写为<k3,v3>。
该业务要求统计指定文件中的所有单词的出现次数。下面看一下源文件的内容为:
“hello you”
“hello me”
内容很简单,两行文本,每行的单词中间使用空格区分。
程序源码如下代码 2.1。
1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/output"; 23 24 public static void main(String[] args) throws Exception { 25 Configuration conf = new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outPath = new Path(OUT_PATH); 28 if(fileSystem.exists(outPath)){ 29 fileSystem.delete(outPath, true); 30 } 31 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 32 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里 33 34 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 35 job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类 36 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 37 job.setMapOutputValueClass(LongWritable.class); 38 39 job.setPartitionerClass(HashPartitioner.class);//1.3 分区 40 job.setNumReduceTasks(1);//有一个reduce任务运行 41 //1.4 TODO 排序、分组 42 //1.5 TODO 规约 43 job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类 44 job.setOutputKeyClass(Text.class);//指定reduce的输出类型 45 job.setOutputValueClass(LongWritable.class);//2.3 指定写出到哪里 46 FileOutputFormat.setOutputPath(job, outPath);//指定输出文件的格式化类 47 48 job.setOutputFormatClass(TextOutputFormat.class); 49 50 job.waitForCompletion(true);//把job提交给JobTracker运行 51 } 52 53 /** 54 * KEYIN 即k1 表示行的偏移量 55 * VALUEIN 即v1 表示行文本内容 56 * KEYOUT 即k2 表示行中出现的单词 57 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 58 */ 59 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 60 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 61 62 final String line = v1.toString(); 63 final String[] splited = line.split(" "); 64 for (String word : splited) { 65 context.write(new Text(word), new LongWritable(1)); 66 } 67 }; 68 } 69 70 /** 71 * KEYIN 即k2 表示行中出现的单词 72 * VALUEIN 即v2 表示行中出现的单词的次数 73 * KEYOUT 即k3 表示文本中出现的不同单词 74 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 75 * 76 */ 77 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 78 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 79 long times = 0L; 80 for (LongWritable count : v2s) { 81 times += count.get(); 82 } 83 ctx.write(k2, new LongWritable(times)); 84 }; 85 } 86 87 }
代码 2.1
map 方法代码如下,代码2.2。
1 static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 2 final Text key2 = new Text();//key2 表示该行中某一单词 3 final IntWritable value2 = new IntWritable(1);//value2 表示单词在该行中的出现次数 4 //key 表示文本行的起始位置 5 //value 表示文本行 6 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { 7 final String[] splited = value.toString().split(" "); 8 for (String word : splited) { 9 key2.set(word); 10 context.write(key2, value2); 11 } 12 }; 13 }
代码 2.2
上面代码中,注意Mapper 类的泛型不是java 的基本类型,而是Hadoop 的数据类型LongWritable、Text、IntWritable。读者可以简单的等价为java 的类long、String、int。下文会有专门讲解Hadoop 的数据类型。
代码中Mapper 类的泛型依次是<k1,v1,k2,v2>。map 方法的第二个形参是行文本内容,是我们关心的。核心代码是把行文本内容按照空格拆分,把每个单词作为新的键,数值1作为新的值,写入到上下文context 中。在这里,因为输出的是每个单词,所以出现次数是常量1。如果一行文本中包括两个hello,会输出两次<hello,1>。
Reduce方法代码如下,代码2.3
1 /** 2 * key 表示单词 3 * values 表示map方法输出的1的集合 4 * context 上下文对象 5 */ 6 static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 7 final IntWritable value3 = new IntWritable(0);//value3表示单词出现的总次数 8 protected void reduce(Text key, java.lang.Iterable<IntWritable> values,Context context) throws java.io.IOException ,InterruptedException { 9 int sum = 0; 10 for (IntWritable count : values) { 11 sum += count.get(); 12 } 13 final Text key3 = key; 14 value3.set(sum);//执行到这里,sum表示该单词出现的总次数 15 context.write(key3, value3);//key3表示单词,是最后输出的key,value3表示单词出现的总次数,是最后输出的value 16 }; 17 }
代码 2.3
上面代码中,Reducer 类的四个泛型依次是<k2,v2,k3,v3>,要注意reduce 方法的第二个参数是java.lang.Iterable 类型,迭代的是v2。也就是k2 相同的v2 都可以迭代出来。以上就是我们覆盖的map 方法和reduce 方法。现在要把我们的代码运行起来,需要写驱动代码,如下代码 2.4
1 /** 2 * 驱动代码 3 */ 4 public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException { 5 final String INPUT_PATH = "hdfs://hadoop:9000/input";//输入路径 6 final String OUTPUT_PATH = "hdfs://hadoop:9000/output";//输出路径,必须是不存在的 7 final Job job = new Job(new Configuration(),"WordCountApp");//创建一个job对象,封装运行时需要的所有信息 8 job.setJarByClass(WordCountApp.class);//如果需要打成jar运行,需要下面这句 9 FileInputFormat.setInputPaths(job, INPUT_PATH);//告诉job执行作业时输入文件的路径 10 job.setInputFormatClass(TextInputFormat.class);//设置把输入文件处理成键值对的类 11 job.setMapperClass(MyMapper.class);//设置自定义的Mapper类 12 job.setMapOutputKeyClass(Text.class);//设置map方法输出的k2、v2的类型 13 job.setMapOutputValueClass(IntWritable.class); 14 job.setPartitionerClass(HashPartitioner.class);//设置对k2分区的类 15 job.setNumReduceTasks(1);//设置运行的Reducer任务的数量 16 job.setReducerClass(MyReducer.class);//设置自定义的Reducer类 17 job.setOutputKeyClass(Text.class);//设置reduce方法输出的k3、v3的类型 18 job.setOutputValueClass(IntWritable.class); 19 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));//告诉job执行作业时的输出路径 21 job.waitForCompletion(true);//让作业运行,直到运行结束,程序退出 22 }
代码 2.4
在以上代码中,我们创建了一个job 对象,这个对象封装了我们的任务,可以提交到Hadoop 独立运行。最后一句job.waitForCompletion(true),表示把job 对象提交给Hadoop 运行,直到作业运行结束后才可以。
以上代码的运行方式有两种,一种是在宿主机的eclipse 环境中运行,一种是打成jar包在linux 中运行。
第一种运行方式要求宿主机能够访问linux,并且对于输入路径和输出路径中的主机名hadoop , 要在宿主机的hosts 文件中有绑定,我的hosts 文件位于C:\WINDOWS\system32\drivers\etc 文件夹。
第二种运行方式,需要把代码打成jar 包,在linux 下执行命令hadoop jar xxx.jar 运行,运行结束后,文件路径在hdfs://hadoop0:9000/output/part-r-00000。我们看一下输出结果,如图2.6所示。
图2.6
序列化是干什么用的?本质上讲,就是数据保存到java 虚拟机之外,然后又被读到java虚拟机内.如果仅仅是保存,不管是否能读进java 虚拟机的话,就不关心序列化问题了。正是
因为需要被读进java 虚拟机,所以必须识别写出、读入的格式、字符顺序等问题。因此序列化也就是比较重视的事情了。拿密码来打比方。序列化就像加密,反序列化就像解密。
Hadoop 作为分布式存储系统必然涉及到序列化问题。
在前面的例子中,我们看到Mapper、Reducer 类中都使用了Hadoop 自己的数据类型LongWritable、IntWritable、Text。这些数据类型都有一个共同的特点,就是实现了
org.apache.hadoop.io.Writable 接口。我们看一下这个接口的源码,如下代码3.1。
1 package org.apache.hadoop.io; 2 import java.io.DataOutput; 3 import java.io.DataInput; 4 import java.io.IOException; 5 6 public interface Writable { 7 void write(DataOutput out) throws IOException; 8 void readFields(DataInput in) throws IOException; 9 }
代码 3.1
从上面的代码中可以看到Writable 接口只有两个方法,一个是writer 方法,一个是readFields 方法。前者是把对象的属性序列化到DataOutput 中去,后者是从DataInput 把数据反序列化到对象的属性中。
java 中的基本类型有char、byte、boolean、short、int、float、double 共7 中基本类型,除了char,都有对应的Writable 类型。对于int 和long 除了IntWritable、LongWritable 外,还有对应的VintWritable、VlongWritable。除此类型之外,还有字符串类型Text、字节数组类型BytesWritable、空类型NullWritable、对象类型Object Writable。以上这些类型构成了mapreduce 运算的基本类型。这些类型都实现了接口WritableComparable,如下代码3.2。
1 package org.apache.hadoop.io; 2 public interface WritableComparable<T> extends Writable, Comparable<T> { 2 }
代码 3.2
从上面代码中可以看到, 这个接口仅仅多了Comparable 接口。实现java.lang.Comparable 接口的目的是为了调用equals 方法进行比较。
我们看一下LongWritable 类的源码,如下代码3.3
1 package org.apache.hadoop.io; 2 3 import java.io.*; 4 5 /** A WritableComparable for longs. */ 6 public class LongWritable implements WritableComparable { 7 private long value; 8 9 public LongWritable() {} 10 11 public LongWritable(long value) { set(value); } 12 13 /** Set the value of this LongWritable. */ 14 public void set(long value) { this.value = value; } 15 16 /** Return the value of this LongWritable. */ 17 public long get() { return value; } 18 19 public void readFields(DataInput in) throws IOException { 20 value = in.readLong(); 21 } 22 23 public void write(DataOutput out) throws IOException { 24 out.writeLong(value); 25 }
代码 3.3
从上面代码中可以看到,该类实现了WritableComparable 接口,内部有个long 类型的属性value,在readFields 方法中从in 中把long 类型的值读出来,赋给value,这是“反序列化”过程;在write 方法中把value 写入到out 中,这是“序列化”过程。
上传文件时,如果文件的size小于block 的size,那么每个文件就会占用一个block(不是64MB,而是文件实际大小)。如果有非常多的小文件需要上传,那么就需要非常多的block。每一个block 在NameNode 的内存中都会有一个描述信息,这样会占用过多的NameNode 内存。
SequenceFile 可以把大量小文件一起放到一个block 中。在存储相同数量的文件时,可以明显减少block 的数量。
假设有3 个小文件,那么会产生3 个block,那么4 个文件存储后对应的block 如图表3.1:
文件 | file1(大小16M) | file2(大小15M) | file3(大小16M) |
block | block1(大小16M) | block2(大小15M) | block3(大小16M) |
表3.1
如果使用SequenceFile 存储后,只会产生一个block,如表3.2:
文件 | file1(大小16M) | file2(大小15M) | file3(大小16M) |
block | block(大小47M) |
表3.2
可以看出,同样多的小文件,但是占用的block 明显少了。这就是SequenceFile 的作用。另外,SequenceFile 还可以压缩存储的内容,进一步减少文件体积。
类InputFomat 是负责把HDFS 中的文件经过一系列处理变成map 函数的输入部分的。这个类做了三件事情:
第一, 验证输入信息的合法性,包括输入路径是否存在等;
第二,把HDFS 中的文件按照一定规则拆分成InputSplit,每个InputSplit 由一个Mapper执行;
第三,提供RecordReader,把InputSplit 中的每一行解析出来供map 函数处理;
我们看一下这个类的源码,如下代码3.4。
1 public abstract class InputFormat<K, V> { 2 3 public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; 4 5 public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException; 6 7 }
代码 3.4
从图上面代码中可以看到,该类只有两个方法的声明,方法getSplits 的作用是把输入文件划分为很多的输入分片,方法createRecordReader 的作用是输入分片的记录读取器。这些方法的实现都在子类中。
InputFormat 有个子类是FileInputFormat,这是在我们的例子中见到的,我们看一下该类对getSplits 方法的实现,如下代码3.5。
1 public List<InputSplit> getSplits(JobContext job 2 ) throws IOException { 3 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 4 long maxSize = getMaxSplitSize(job); 5 6 // generate splits 7 List<InputSplit> splits = new ArrayList<InputSplit>(); 8 List<FileStatus>files = listStatus(job); 9 for (FileStatus file: files) { 10 Path path = file.getPath(); 11 FileSystem fs = path.getFileSystem(job.getConfiguration()); 12 long length = file.getLen(); 13 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); 14 if ((length != 0) && isSplitable(job, path)) { 15 long blockSize = file.getBlockSize(); 16 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 17 18 long bytesRemaining = length; 19 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 20 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 21 splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 22 blkLocations[blkIndex].getHosts())); 23 bytesRemaining -= splitSize; 24 } 25 26 if (bytesRemaining != 0) { 27 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 28 blkLocations[blkLocations.length-1].getHosts())); 29 } 30 } else if (length != 0) { 31 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); 32 } else { 33 //Create empty hosts array for zero length files 34 splits.add(new FileSplit(path, 0, length, new String[0])); 35 } 36 } 37 38 // Save the number of input files in the job-conf 39 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 40 41 LOG.debug("Total # of splits: " + splits.size()); 42 return splits; 43 }
代码 3.5
在上面代码中,第3 行计算minSize,是供后面计算使用的,其中getFormatMinSplitSize()方法的值是1,getMinSplitSize(job)方法的值由配置参数mapred.min.split.size 指定,默认值是1,所以minSize 的默认值就是1。第4 行计算maxSize,是供后面计算使用的,值由配置参数mapred.max.split.size 指定,默认值是long 的最大值。第8行files 列表中存放的是输入文件,可能有多个。从第9 行开始,循环处理每一个输入文件。第10 行是获得文件路径,第12 行是获得文件长度,第13行是获得文件块位置。如果文件非空,并且文件允许被分割为输入块,那么就进入第14行的条件判断中。第15 行是读取文件块size,默认是64MB,第260 行是计算输入块size,我们看一下computeSplitSize 方法,如下代码3.6。
1 protected long computeSplitSize(long blockSize, long minSize,long maxSize) { 2 return Math.max(minSize, Math.min(maxSize, blockSize)); 3 }
代码 3.6
从上面代码中可以看出,输入块size 由三个因素决定,分别是minSize、maxSize、blockSize。根据前面的数值,可以得知,输入分片的默认size 是文件块size。
我们回到代码3.5,getSplits 方法的代码中继续分析,在第19至24行的循环体中,是对文件按照输入分片size 进行切分。
总结一下上面的分析,如果输入文件有3 个,那么产生的输入分片的情况如表3.3 所示.
文件大小 | 产生的输入片 | |
输入文件1 | 63MB | 1 个 |
输入文件2 | 64MB | 1 个 |
输入文件3 | 65MB | 2 个 |
注:参数mapred.min.split.size、mapred.max.split.size、dfs.block.size 采用默认值 |
表3.3
源码在JobInProcess 中,如下:
1 TaskSplitMetaInfo[] splits = createSplits(jobId); 2 if (numMapTasks != splits.length) { 3 throw new IOException("Number of maps in JobConf doesn‘t match number of " + 4 "recieved splits for job " + jobId + "! " + 5 "numMapTasks=" + numMapTasks + ", #splits=" + splits.length); 6 } 7 numMapTasks = splits.length;
该类中有个很重要的方法是实现TextInputFormat 中的createRecordReader,如代码3.7
1 public class TextInputFormat extends FileInputFormat<LongWritable, Text> { 2 3 @Override 4 public RecordReader<LongWritable, Text> 5 createRecordReader(InputSplit split, 6 TaskAttemptContext context) { 7 return new LineRecordReader(); 8 } 9 10 @Override 11 protected boolean isSplitable(JobContext context, Path file) { 12 CompressionCodec codec = 13 new CompressionCodecFactory(context.getConfiguration()).getCodec(file); 14 if (null == codec) { 15 return true; 16 } 17 return codec instanceof SplittableCompressionCodec; 18 } 19 }
代码3.7
在代码3.7 中,该方法直接返回一个实例化的LineRecordReader 类,我们看一下这个类,如代码3.8。
1 public class LineRecordReader extends RecordReader<LongWritable, Text> { 2 private static final Log LOG = LogFactory.getLog(LineRecordReader.class); 3 4 private CompressionCodecFactory compressionCodecs = null; 5 private long start; 6 private long pos; 7 private long end; 8 private LineReader in; 9 private int maxLineLength; 10 private LongWritable key = null; 11 private Text value = null; 12 private Seekable filePosition; 13 private CompressionCodec codec; 14 private Decompressor decompressor; 15 }
代码3.8
在代码3.8 中,可以看到该类的几个属性,其中start、pos、end 表示文件中字节的位置,key 和value 表示从记录中解析出的键和值,in 是一个行内容的读取器。
继续分析其中的initialize 方法,initialize(…)方法是该类的初始化方法,在调用其他方法前先调用该方法,并且只调用一次。从在代码3.9 中可以看到,该类对类FileSplit 的对象split 进行了分析,属性start 表示split的起始位置,属性end 表示split 的结束位置,属性in 表示split 的阅读器。
1 public void initialize(InputSplit genericSplit, 2 TaskAttemptContext context) throws IOException { 3 FileSplit split = (FileSplit) genericSplit; 4 Configuration job = context.getConfiguration(); 5 this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 6 Integer.MAX_VALUE); 7 start = split.getStart(); 8 end = start + split.getLength(); 9 final Path file = split.getPath(); 10 compressionCodecs = new CompressionCodecFactory(job); 11 codec = compressionCodecs.getCodec(file); 12 13 // open the file and seek to the start of the split 14 FileSystem fs = file.getFileSystem(job); 15 FSDataInputStream fileIn = fs.open(split.getPath()); 16 17 if (isCompressedInput()) { 18 decompressor = CodecPool.getDecompressor(codec); 19 if (codec instanceof SplittableCompressionCodec) { 20 final SplitCompressionInputStream cIn = 21 ((SplittableCompressionCodec)codec).createInputStream( 22 fileIn, decompressor, start, end, 23 SplittableCompressionCodec.READ_MODE.BYBLOCK); 24 in = new LineReader(cIn, job); 25 start = cIn.getAdjustedStart(); 26 end = cIn.getAdjustedEnd(); 27 filePosition = cIn; 28 } else { 29 in = new LineReader(codec.createInputStream(fileIn, decompressor), 30 job); 31 filePosition = fileIn; 32 } 33 } else { 34 fileIn.seek(start); 35 in = new LineReader(fileIn, job); 36 filePosition = fileIn; 37 } 38 // If this is not the first split, we always throw away first record 39 // because we always (except the last split) read one extra line in 40 // next() method. 41 if (start != 0) { 42 start += in.readLine(new Text(), 0, maxBytesToConsume(start)); 43 } 44 this.pos = start; 45 } 46
代码 3.9
下面查看方法nextKeyValue 的源码,代码3.10
1 public boolean nextKeyValue() throws IOException { 2 if (key == null) { 3 key = new LongWritable(); 4 } 5 key.set(pos); 6 if (value == null) { 7 value = new Text(); 8 } 9 int newSize = 0; 10 // We always read one extra line, which lies outside the upper 11 // split limit i.e. (end - 1) 12 while (getFilePosition() <= end) { 13 newSize = in.readLine(value, maxLineLength, 14 Math.max(maxBytesToConsume(pos), maxLineLength)); 15 if (newSize == 0) { 16 break; 17 } 18 pos += newSize; 19 if (newSize < maxLineLength) { 20 break; 21 } 22 23 // line too long. try again 24 LOG.info("Skipped line of size " + newSize + " at pos " + 25 (pos - newSize)); 26 } 27 if (newSize == 0) { 28 key = null; 29 value = null; 30 return false; 31 } else { 32 return true; 33 } 34 }
代码3.10
在代码3.10 中,key 的值是pos 的值,那么这个pos 的值来自第13 行的in.readLine(…)方法的返回值。类LineReader 的方法readLine 是读取每一行的内容,把内容存放到第一个参数value 中,返回值表示读取的字节数。从这里可以看到,类LineRecordReader 的属性key表示InputSplit 中读取的字节位置,value 表示读取的文本行的内容。看一下代码3.11
1 @Override 2 public LongWritable getCurrentKey() { 3 return key; 4 } 5 6 @Override 7 public Text getCurrentValue() { 8 return value; 9 }
代码 3.11
在代码 3.11中,方法getCurrentKey()返回的是key 的值,方法getCurrentValue()返回的是value 的值。
综合以上的分析来看,该类中的getCurrentKeyValue()会被不断的调用,每次被调用后,会同时调用getCurrentKey()和getCurrentValue()。
该类是对类FileSystem 操作执行输出的,会对运算的结果先写入到一个临时文件夹中,待运算结束后,再移动到最终的输出目录中。那么,输出的内容具体是什么格式?这是由TextOutputFormat 类负责的。
该类专门输出普通文本文件的,如代码3.12
1 package org.apache.hadoop.mapreduce.lib.output; 2 3 import java.io.DataOutputStream; 4 import java.io.IOException; 5 import java.io.UnsupportedEncodingException; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.fs.FSDataOutputStream; 11 12 import org.apache.hadoop.io.NullWritable; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.io.compress.CompressionCodec; 15 import org.apache.hadoop.io.compress.GzipCodec; 16 import org.apache.hadoop.mapreduce.OutputFormat; 17 import org.apache.hadoop.mapreduce.RecordWriter; 18 import org.apache.hadoop.mapreduce.TaskAttemptContext; 19 import org.apache.hadoop.util.*; 20 21 /** An {@link OutputFormat} that writes plain text files. */ 22 public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { 23 protected static class LineRecordWriter<K, V> 24 extends RecordWriter<K, V> { 25 private static final String utf8 = "UTF-8"; 26 private static final byte[] newline; 27 static { 28 try { 29 newline = "\n".getBytes(utf8); 30 } catch (UnsupportedEncodingException uee) { 31 throw new IllegalArgumentException("can‘t find " + utf8 + " encoding"); 32 } 33 } 34 35 protected DataOutputStream out; 36 private final byte[] keyValueSeparator; 37 38 public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { 39 this.out = out; 40 try { 41 this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 42 } catch (UnsupportedEncodingException uee) { 43 throw new IllegalArgumentException("can‘t find " + utf8 + " encoding"); 44 } 45 } 46 47 public LineRecordWriter(DataOutputStream out) { 48 this(out, "\t"); 49 }
代码3.12
在代码3.10 中,文本输出的时候使用UTF-8 编码,次第29行的代码可以看出,划分行的符号是“\n”。从第47行的构造方法可以看出,输出的键值对的默认分隔符是制表符“\t”。由此不难理解,为什么输出文件中是一行行的内容,为什么键值对使用制表符分隔了。
标签:des style blog http color io os 使用 java
原文地址:http://www.cnblogs.com/sunddenly/p/3985386.html