标签:
Hadoop序列化文件SequenceFile可以用于解决大量小文件(所谓小文件:泛指小于black大小的文件)问题,SequenceFile是Hadoop API提供的一种二进制文件支持。这种二进制文件直接将<key,value>对序列化到文件中,一般对小文件可以使用这种文件合并,即将文件名作为key,文件内容作为value序列化到大文件中。
hadoop Archive也是一个高效地将小文件放入HDFS块中的文件存档文件格式,详情请看:hadoop Archive
但是SequenceFile文件不能追加写入,适用于一次性写入大量小文件的操作。
SequenceFile的压缩基于CompressType,请看源码:
/** * The compression type used to compress key/value pairs in the * {@link SequenceFile}. * @see SequenceFile.Writer */ public static enum CompressionType { /** Do not compress records. */ NONE, //不压缩 /** Compress values only, each separately. */ RECORD, //只压缩values /** Compress sequences of records together in blocks. */ BLOCK //压缩很多记录的key/value组成块 }
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; /** * @version 1.0 * @author Fish */ public class SequenceFileWriteDemo { private static final String[] DATA = { "fish1", "fish2", "fish3", "fish4" }; public static void main(String[] args) throws IOException { /** * 写SequenceFile */ String uri = "/test/fish/seq.txt"; Configuration conf = new Configuration(); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); Writer writer = null; try { /** * CompressionType.NONE 不压缩<br> * CompressionType.RECORD 只压缩value<br> * CompressionType.BLOCK 压缩很多记录的key/value组成块 */ writer = SequenceFile.createWriter(conf, Writer.file(path), Writer.keyClass(key.getClass()), Writer.valueClass(value.getClass()), Writer.compression(CompressionType.BLOCK)); for (int i = 0; i < 4; i++) { value.set(DATA[i]); key.set(i); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } /** * 读SequenceFile */ SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path)); IntWritable key1 = new IntWritable(); Text value1 = new Text(); while (reader.next(key1, value1)) { System.out.println(key1 + "----" + value1); } IOUtils.closeStream(reader);// 关闭read流 /** * 用于排序 */ // SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, comparator, IntWritable.class, Text.class, conf); } }
out = fs.create(p, true, bufferSize, replication, blockSize, progress);
SequenceFile文件的数据组成形式:
一,Header
写入头部的源码:
/** Write and flush the file header. */ private void writeFileHeader() throws IOException { out.write(VERSION);//版本号 Text.writeString(out, keyClass.getName());//key的Class Text.writeString(out, valClass.getName());//val的Class out.writeBoolean(this.isCompressed());//是否压缩 out.writeBoolean(this.isBlockCompressed());//是否是CompressionType.BLOCK类型的压缩 if (this.isCompressed()) { Text.writeString(out, (codec.getClass()).getName());//压缩类的名称 } this.metadata.write(out);//写入metadata out.write(sync); // write the sync bytes out.flush(); // flush header }版本号:
private static byte[] VERSION = new byte[] { (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA };
byte[] sync; // 16 random bytes { try { MessageDigest digester = MessageDigest.getInstance("MD5"); long time = Time.now(); digester.update((new UID()+"@"+time).getBytes()); sync = digester.digest(); } catch (Exception e) { throw new RuntimeException(e); } }二,Record
Writer有三个实现类,分别对应CompressType的NONE,RECOR,BLOCK。下面逐一介绍一下(结合上面的图看):
1,NONE SequenceFile
Record直接存Record 的长度,KEY的长度,key值,Value的值
2, BlockCompressWriter
/** Append a key/value pair. */ @Override @SuppressWarnings("unchecked") public synchronized void append(Object key, Object val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key+" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val+" is not "+valClass); // Save key/value into respective buffers int oldKeyLength = keyBuffer.getLength(); keySerializer.serialize(key); int keyLength = keyBuffer.getLength() - oldKeyLength; if (keyLength < 0) throw new IOException("negative length keys not allowed: " + key); WritableUtils.writeVInt(keyLenBuffer, keyLength);//每调一次,都会累加keyLength int oldValLength = valBuffer.getLength(); uncompressedValSerializer.serialize(val); int valLength = valBuffer.getLength() - oldValLength; WritableUtils.writeVInt(valLenBuffer, valLength);//每调一次,都会累加valLength // Added another key/value pair ++noBufferedRecords; // Compress and flush? int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); if (currentBlockSize >= compressionBlockSize) { //compressionBlockSize = conf.getInt("io.seqfile.compress.blocksize", 1000000); //超过1000000就会写一个Sync sync(); }
超过compressionBlockSize的大小,就会调用sync()方法,下面看看sync的源码(和上面的图对照):
会写入和图中所画的各个数据项。
/** Compress and flush contents to dfs */ @Override public synchronized void sync() throws IOException { if (noBufferedRecords > 0) { super.sync(); // No. of records WritableUtils.writeVInt(out, noBufferedRecords); // Write 'keys' and lengths writeBuffer(keyLenBuffer); writeBuffer(keyBuffer); // Write 'values' and lengths writeBuffer(valLenBuffer); writeBuffer(valBuffer); // Flush the file-stream out.flush(); // Reset internal states keyLenBuffer.reset(); keyBuffer.reset(); valLenBuffer.reset(); valBuffer.reset(); noBufferedRecords = 0; } }
2,RecordCompressWriter
/** Append a key/value pair. */ @Override @SuppressWarnings("unchecked") public synchronized void append(Object key, Object val) throws IOException { if (key.getClass() != keyClass) throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass); if (val.getClass() != valClass) throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass); buffer.reset(); // Append the 'key' keySerializer.serialize(key); int keyLength = buffer.getLength(); if (keyLength < 0) throw new IOException("negative length keys not allowed: " + key); // Compress 'value' and append it deflateFilter.resetState(); compressedValSerializer.serialize(val); deflateOut.flush(); deflateFilter.finish(); // Write the record out checkAndWriteSync(); // sync out.writeInt(buffer.getLength()); // total record length record的长度 out.writeInt(keyLength); // key portion length key的长度 out.write(buffer.getData(), 0, buffer.getLength()); // data 数据 }写入Sync:
synchronized void checkAndWriteSync() throws IOException { if (sync != null && out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync sync(); } }
private static final int SYNC_ESCAPE = -1; // "length" of sync entries private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash /** The number of bytes between sync points.*/ public static final int SYNC_INTERVAL = 100*SYNC_SIZE;每2000个byte,就会写一个Sync。
总结:
Record:存储SequenceFile通用的KV数据格式,Key和Value都是二进制变长的数据。Record表示Key和Value的byte的总和。
Sync:主要是用来扫描和恢复数据的,以至于读取数据的Reader不会迷失。
Header:存储了如下信息:文件标识符SEQ,key和value的格式说明,以及压缩的相关信息,metadata等信息。
metadata:包含文件头所需要的数据:文件标识、Sync标识、数据格式说明(含压缩)、文件元数据(时间、owner、权限等)、检验信息等
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:
原文地址:http://blog.csdn.net/qianshangding0708/article/details/47666735