Mapper的输出,在发送到Reducer前是存放在本地文件系统的,IFile提供了对Mapper输出的管理。我们已经知道,Mapper的输出是<Key,Value>对,IFile以记录<key-len, value-len, key,value>的形式存放了这些数据。为了保存键值对的边界,很自然IFile需要保存key-len和value-len。
和IFile相关的类图如下:
其中,文件流形式的输入和输出是由IFIleInputStream和IFIleOutputStream抽象。以记录形式的读/写操作由IFile.Reader/IFile.Writer提供,IFile.InMemoryReader用于读取存在于内存中的IFile文件格式数据。
我们以输出为例,来分析这部分的实现。首先是下图的和序列化反序列化相关的Serialization/Deserializer,这部分的code是在包org.apache.hadoop.io.serializer。序列化由Serializer抽象,通过Serializer的实现,用户可以利用serialize方法把对象序列化到通过open方法打开的输出流里。Deserializer提供的是相反的过程,对应的方法是deserialize。hadoop.io.serializer中还实现了配合工作的Serialization和对应的工厂SerializationFactory。两个具体的实现是WritableSerialization和JavaSerialization,分别对应了Writeble的序列化反序列化和Java本身带的序列化反序列化。
有了Serializer/Deserializer,我们来分析IFile.Writer。Writer的构造函数是:
public Writer(Configuration conf,FSDataOutputStream out,
Class<K> keyClass, Class<V>valueClass,
CompressionCodec codec, Counters.CounterwritesCounter)
conf,配置参数,out是Writer的输出,keyClass 和valueClass是输出的Kay,Value的class属性,codec是对输出进行压缩的方法,参数writesCounter用于对输出字节数进行统计的Counters.Counter。通过这些参数,我们可以构造我们使用的支持压缩功能的输出流(类成员out,类成员rawOut保存了构造函数传入的out),相关的计数器,还有就是Kay,Value的Serializer方法。
Writer最主要的方法是append方法(居然不是write方法,呵呵),有两种形式:
public void append(K key, V value) throws IOException {
public void append(DataInputBuffer key,DataInputBuffer value)
append(K key, V value)的主要过程是检查参数,然后将key和value序列化到DataOutputBuffer中,并获取序列化后的长度,最后把长度(2个)和DataOutputBuffer中的结果写到输出,并复位DataOutputBuffer和计数。append(DataInputBufferkey, DataInputBuffer value)处理过程也比较类似,就不再分析了。
close方法中需要注意的是,我们需要标记文件尾,或者是流结束。目前是通过写2个值为EOF_MARKER的长度来做标记。
IFileOutputStream是用于配合Writer的输出流,它会在IFiles的最后添加校验数据。当Writer调用IFileOutputStream的write操作时,IFileOutputStream计算并保持校验和,流被close的时候,校验结果会写到对应文件的文件尾。实际上存放在磁盘上的文件是一系列的<key-len, value-len, key, value>记录和校验结果。
更多精彩内容请关注:http://bbs.superwu.cn
关注超人学院微信二维码:
原文地址:http://crxy2013.blog.51cto.com/9922445/1655728