标签:类型 tao records 一个 解决方案 lte null sdn void
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有不少人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。不过,这类解决方案还涉及到Hadoop的另一种文件格式——MapFile文件。SequenceFile文件并不保证其存储的key-value数据是按照key的某个顺序存储的,同时不支持append操作。
在SequenceFile文件中,每一个key-value被看做是一条记录(Record),因此基于Record的压缩策略,SequenceFile文件可支持三种压缩类型(
SequenceFile.CompressionType
)
:
NONE: 对records不进行压缩;
RECORD: 仅压缩每一个record中的value值;
BLOCK: 将一个block中的所有records压缩在一起;
那么,基于这三种压缩类型,Hadoop提供了对应的三种类型的Writer:
SequenceFile.Writer 写入时不压缩任何的key-value对(Record);
- public static class Writer implements java.io.Closeable {
-
- ...
-
- void init(Path name, Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, CompressionCodec codec, Metadata metadata) throws IOException {
- this.conf = conf;
- this.out = out;
- this.keyClass = keyClass;
- this.valClass = valClass;
- this.compress = compress;
- this.codec = codec;
- this.metadata = metadata;
-
-
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keySerializer = serializationFactory.getSerializer(keyClass);
- this.keySerializer.open(buffer);
- this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
- this.uncompressedValSerializer.open(buffer);
-
-
- if (this.codec != null) {
- ReflectionUtils.setConf(this.codec, this.conf);
- this.compressor = CodecPool.getCompressor(this.codec);
- this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
- this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
- this.compressedValSerializer = serializationFactory.getSerializer(valClass);
- this.compressedValSerializer.open(deflateOut);
- }
- }
-
-
-
- public synchronized void append(Object key, Object val) throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);
-
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);
-
- buffer.reset();
-
-
- keySerializer.serialize(key);
- int keyLength = buffer.getLength();
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
-
-
- if (compress) {
- deflateFilter.resetState();
- compressedValSerializer.serialize(val);
- deflateOut.flush();
- deflateFilter.finish();
- } else {
-
- uncompressedValSerializer.serialize(val);
- }
-
-
- checkAndWriteSync();
- out.writeInt(buffer.getLength());
- out.writeInt(keyLength);
- out.write(buffer.getData(), 0, buffer.getLength());
- }
-
-
- public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + keyLength);
-
- int valLength = val.getSize();
-
- checkAndWriteSync();
-
-
- out.writeInt(keyLength+valLength);
- out.writeInt(keyLength);
- out.write(keyData, keyOffset, keyLength);
- val.writeUncompressedBytes(out);
- }
-
- ...
-
- }
SequenceFile.RecordCompressWriter写入时只压缩key-value对(Record)中的value;
- static class RecordCompressWriter extends Writer {
- ...
-
- public synchronized void append(Object key, Object val) throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);
-
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);
-
- buffer.reset();
-
-
- keySerializer.serialize(key);
- int keyLength = buffer.getLength();
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
-
-
- deflateFilter.resetState();
- compressedValSerializer.serialize(val);
- deflateOut.flush();
- deflateFilter.finish();
-
-
- checkAndWriteSync();
- out.writeInt(buffer.getLength());
- out.writeInt(keyLength);
- out.write(buffer.getData(), 0, buffer.getLength());
- }
-
-
- public synchronized void appendRaw(byte[] keyData, int keyOffset,
- int keyLength, ValueBytes val) throws IOException {
-
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + keyLength);
-
- int valLength = val.getSize();
-
- checkAndWriteSync();
- out.writeInt(keyLength+valLength);
- out.writeInt(keyLength);
- out.write(keyData, keyOffset, keyLength);
- val.writeCompressedBytes(out);
- }
-
- }
-
-
- ...
- }
SequenceFile.BlockCompressWriter 写入时将一批key-value对(Record)压缩成一个Block;
- static class BlockCompressWriter extends Writer {
- ...
-
- void init(int compressionBlockSize) throws IOException {
- this.compressionBlockSize = compressionBlockSize;
- keySerializer.close();
- keySerializer.open(keyBuffer);
- uncompressedValSerializer.close();
- uncompressedValSerializer.open(valBuffer);
- }
-
-
- private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException {
- deflateFilter.resetState();
- buffer.reset();
- deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength());
- deflateOut.flush();
- deflateFilter.finish();
-
- WritableUtils.writeVInt(out, buffer.getLength());
- out.write(buffer.getData(), 0, buffer.getLength());
- }
-
-
- public synchronized void sync() throws IOException {
- if (noBufferedRecords > 0) {
- super.sync();
-
-
- WritableUtils.writeVInt(out, noBufferedRecords);
-
-
- writeBuffer(keyLenBuffer);
- writeBuffer(keyBuffer);
-
-
- writeBuffer(valLenBuffer);
- writeBuffer(valBuffer);
-
-
- out.flush();
-
-
- keyLenBuffer.reset();
- keyBuffer.reset();
- valLenBuffer.reset();
- valBuffer.reset();
- noBufferedRecords = 0;
- }
-
- }
-
-
-
- public synchronized void append(Object key, Object val) throws IOException {
- if (key.getClass() != keyClass)
- throw new IOException("wrong key class: "+key+" is not "+keyClass);
-
- if (val.getClass() != valClass)
- throw new IOException("wrong value class: "+val+" is not "+valClass);
-
-
- int oldKeyLength = keyBuffer.getLength();
- keySerializer.serialize(key);
- int keyLength = keyBuffer.getLength() - oldKeyLength;
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed: " + key);
- WritableUtils.writeVInt(keyLenBuffer, keyLength);
-
-
- int oldValLength = valBuffer.getLength();
- uncompressedValSerializer.serialize(val);
- int valLength = valBuffer.getLength() - oldValLength;
- WritableUtils.writeVInt(valLenBuffer, valLength);
-
-
- ++noBufferedRecords;
-
-
- int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
-
- if (currentBlockSize >= compressionBlockSize) {
- sync();
- }
- }
-
-
- public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
-
- if (keyLength < 0)
- throw new IOException("negative length keys not allowed");
-
- int valLength = val.getSize();
-
-
- WritableUtils.writeVInt(keyLenBuffer, keyLength);
- keyBuffer.write(keyData, keyOffset, keyLength);
- WritableUtils.writeVInt(valLenBuffer, valLength);
- val.writeUncompressedBytes(valBuffer);
-
-
- ++noBufferedRecords;
-
-
- int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
- if (currentBlockSize >= compressionBlockSize) {
- sync();
- }
- }
-
- }
-
-
- ...
- }
源码中,block的大小compressionBlockSize默认值为1000000,也可通过配置参数io.seqfile.compress.blocksize来指定。
根据三种压缩算法,共有三种类型的SequenceFile文件格式:
1). Uncompressed SequenceFile
2). Record-Compressed SequenceFile
3). Block-Compressed SequenceFile
SequenceFile文件
标签:类型 tao records 一个 解决方案 lte null sdn void
原文地址:http://www.cnblogs.com/mfryf/p/7072446.html