标签:
Writable接口
Hadoop并没有使用JAVA的序列化机制,而是引入了自己的序列化系统,package org.apache.hadoop.io 这个包中定义了大量的可序列化对象,这些对象都实现了Writable接口,Writable接口是序列化对象的一个通用接口。其中包含了write()和readFields()两个序列化相关方法。
WritableComparable接口
WriteCompareable接口是对Wirtable接口的二次封装,提供了compareTo(T o)方法,用于序列化对象的比较。因为MR中间有个基于key的排序阶段。
RawComparable接口
Hadoop为优化Shuffle阶段的排序,提供了原生的比较器接口RawComparator<T>用于在字节流层面进行比较,从而大大缩短了比较的时间开销。该接口并非被多数的衍生类所实现,多数情况下其直接子类WritableComparator作为实现Writable接口类的内置类,提供序列化字节的比较功能。
WritableComparator类
1).对原始compare()方法的一个默认实现:先【反序列化】为对象,再通过【比较对象】,有开销的问题。所以,对于继承writeCompatable的具体子类都会要求覆写compare()方法以加快效率。
//原始compare()是将要比较的二进制流,先反序列化为对象,再调用对象的比较方法进行比较。 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { //利用Buffer为桥接中介,把字节数组存储为buffer后 buffer.reset(b1, s1, l1); //调用key1(WritableComparable类型)的反序列化方法 key1.readFields(buffer); buffer.reset(b2, s2, l2); key2.readFields(buffer); } catch (IOException e) { throw new RuntimeException(e); } //调用Writable对象的compare()比较方法进行比较 return compare(key1, key2); }
2).define()方法用于注册WritebaleComparaor对象到注册表中(Hadoop自动调用比较器)。
public static void define(Class c, WritableComparator comparator) { comparators.put(c, comparator); }
3).以上两个方法在自定义的WritableComparable子类类中,都必须覆写,以实现高效排序。
Writable类的字节长度
在定制Writable类之前,应该先了解不同Writable类占用磁盘空间的大小。通过减少Writable实例的字节数,加快数据的读取和减少网络的数据传输。下表显示的是Hadoop对Java基本类型包装后相应的Writable类占用的字节长度:
Java基本类型 |
字节数 |
Writable实现 |
序列化后字节数 (bytes) |
boolean |
1/8 |
BooleanWritable |
1 |
byte |
1 |
ByteWritable |
1 |
short |
2 |
ShortWritable |
2 |
int |
4 |
IntWritable |
4 |
|
|
VIntWritable |
1–5 |
float |
4 |
FloatWritable |
4 |
long |
8 |
LongWritable |
8 |
|
|
VLongWritable |
1–9 |
double |
8 |
DoubleWritable |
8 |
不同Writable类型序列化后的字节长度是不一样的,需要综合考虑应用中数据特征选择合适的类型。对于整数类型有两种选择,一种是定长(fixed-length)Writable类型,IntWritable和LongWritable;另一种是变长(variable-length)Writable类型,VIntWritable和VLongWritable。变长类型是根据数值的大小使用相应的字节长度表示,当数值在-112~127之间时使用1个字节表示,在-112~127范围之外的数值使用头一个字节表示该数值的正负符号以及字节长度(zero-compressed encoded integer)。
对于整数类型的Writable选择,建议:
package cn.itcast.hadoop.mr; import java.io.*; import org.apache.hadoop.io.*; import org.apache.hadoop.util.StringUtils; //测试十进制序列化成不同Writable类型所占用的字节数组长度 public class WritableBytesLengthDemo { public static void main(String[] args) throws IOException { //将十亿用不同Writable类型表示出来 IntWritable int_b = new IntWritable(1000000000); LongWritable long_b = new LongWritable(1000000000); VIntWritable vint_b = new VIntWritable(1000000000); VLongWritable vlong_b = new VLongWritable(1000000000); //将不同的Writable类型序列化成字节数组 byte[] bs_int_b = serialize(int_b); byte[] bs_long_b = serialize(long_b); byte[] bs_vint_b = serialize(vint_b); byte[] bs_vlong_b = serialize(vlong_b); //以十六进制形式打印字节数组,并打印出数组的长度 String hex = StringUtils.byteToHexString(bs_int_b); formatPrint("IntWritable", "1,000,000,000",hex, bs_int_b.length); hex = StringUtils.byteToHexString(bs_long_b); formatPrint("LongWritable", "1,000,000,000",hex, bs_long_b.length); hex = StringUtils.byteToHexString(bs_vint_b); formatPrint("VIntWritable", "1,000,000,000",hex, bs_vint_b.length); hex = StringUtils.byteToHexString(bs_vlong_b); formatPrint("VLongWritable", "1,000,000,000", hex, bs_vlong_b.length); } //定义输出格式 private static void formatPrint(String type, String param, String hex, int length) { String format = "%1$-50s %2$-16s with length: %3$2d%n"; System.out.format(format, "Byte array per " + type + "("+ param +") is:", hex, length); } //将一个实现了Writable接口的对象序列化成字节流 public static byte[] serialize(Writable writable) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); } //反序列化 public static Writable deserialize(Writable writable, byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return writable; } }
Byte array per IntWritable(1,000,000,000) is: 3b9aca00 with length: 4
Byte array per LongWritable(1,000,000,000) is: 000000003b9aca00 with length: 8
Byte array per VIntWritable(1,000,000,000) is: 8c3b9aca00 with length: 5
Byte array per VLongWritable(1,000,000,000) is: 8c3b9aca00 with length: 5
从上面的输出我们可以看出:
Text的字节序列
下面以Text类中字节比较的代码进行说明:
/** A WritableComparator optimized for Text keys. */ public static class Comparator extends WritableComparator { public Comparator() { super(Text.class); } @Override //b1代表字节数组;s1代表一个text类型的起始字节;l1代表一个text类型的字节长度 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { //返回Text的字符长度 int n1 = WritableUtils.decodeVIntSize(b1[s1]); int n2 = WritableUtils.decodeVIntSize(b2[s2]); //比较器跳过 代表Text字符长度 的字节,直接比对UTF编码的真正的字符串部分的字节 //compareBytes()方法是对字节进行逐个比较。一旦找到一个不同的,然后就返回结果,后面的不管 return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2); } }
标签:
原文地址:http://www.cnblogs.com/skyl/p/4732250.html