标签:style blog http ar io color os 使用 sp
1.定制Writable类型
Hadoop中有一套Writable实现,例如:IntWritable、Text等,但是,有时候可能并不能满足自己的需求,这个时候,就需要自己定制Writable类型。
定制分以下几步:
由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法
如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法
Example:
CreateWritable.java
1 package cn.roboson.writable; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.io.WritableComparable; 9 /** 10 * 1.定制一个Writable类型,里面包含两个Text 11 * 2.和IntWritable等原有的Writable类似,它需要实现WritableComparable 12 * 3.由于hashCode()方法对reduce分区很重要,所以,需要重写java.lang.Object的hashCode()方法 13 * 4.如果要结合使用TextOutputFormat和定制的Writable,则许重写java.lang.Object的toString()方法 14 * @author roboson 15 * 16 */ 17 18 public class CreateWritable implements WritableComparable<CreateWritable>{ 19 20 private Text first; 21 private Text second; 22 23 //构造方法,这个是必须要的 24 public CreateWritable(){ 25 26 } 27 28 public CreateWritable(String first,String second){ 29 set(new Text(first), new Text(second)); 30 } 31 32 public CreateWritable(Text first,Text second){ 33 set(first,second); 34 } 35 36 37 38 public Text getFirst() { 39 return first; 40 } 41 42 public void setFirst(Text first) { 43 this.first = first; 44 } 45 46 public Text getSecond() { 47 return second; 48 } 49 50 public void setSecond(Text second) { 51 this.second = second; 52 } 53 54 public void set(Text first,Text second){ 55 this.first=first; 56 this.second=second; 57 } 58 59 60 @Override 61 public void readFields(DataInput in) throws IOException { 62 // TODO Auto-generated method stub 63 first.readFields(in); 64 second.readFields(in); 65 } 66 67 @Override 68 public void write(DataOutput out) throws IOException { 69 // TODO Auto-generated method stub 70 first.write(out); 71 second.write(out); 72 } 73 74 @Override 75 public int compareTo(CreateWritable other) { 76 // TODO Auto-generated method stub 77 int cmp=first.compareTo(other.getFirst()); 78 if(cmp!=0){ 79 return cmp; 80 } 81 return second.compareTo(other.getSecond()); 82 } 83 84 @Override 85 public int hashCode() { 86 // TODO Auto-generated method stub 87 return first.hashCode()*163 + second.hashCode(); 88 } 89 90 @Override 91 public String toString() { 92 // TODO Auto-generated method stub 93 return first+"\t"+second; 94 } 95 96 @Override 97 public boolean equals(Object obj) { 98 // TODO Auto-generated method stub 99 if(obj instanceof CreateWritable){ 100 CreateWritable create = (CreateWritable) obj; 101 return first.equals(create.getFirst()) && second.equals(create.getSecond()); 102 } 103 return false; 104 } 105 106 }
Writable06.java
1 package cn.roboson.writable; 2 3 import org.apache.hadoop.io.Text; 4 5 public class Writable06 { 6 7 public static void main(String[] args) { 8 CreateWritable createWritable01 = new CreateWritable("Hadoop", "roboson"); 9 CreateWritable createWritable02 = new CreateWritable(new Text("Hadoop"), new Text("roboson")); 10 11 //重写了equals()方法,相比叫first 和 second两者都相同的时候,返回true 12 System.out.println(createWritable01.equals(createWritable02)); 13 14 //重写了compareTo()方法,先比较first,再比较second,返回0:相等 -1:first<second 1:first>second 15 System.out.println(createWritable01.compareTo(createWritable02)); 16 17 //重写了toString()方法,返回 first +"\t"+second 18 System.out.println(createWritable01.toString()); 19 } 20 }
运行结果:
2.为速度实现一个RawComparator
关于Comparator方面的知识,可以参考我的博文《Hadoop中WritableComparable 和 comparator》,Comparator中,有一个方法是compare(byte[] b1,int s1,int l1,byte[] b2,int s2, int l2),该方法直接比较的是序列化,不必先将序列化数据流转化为对象,再进行比较,所以,与上面的compareTo()方法,相比,其更高效!其实,我们查看HadoopAPI,就可以发现,基本类型的Writable类都实现有了Comparator作为其内部类:
好,再看看IntWritable.Comparator这个类,如下图所示,发现它是一个静态类,好好观察它的结构:
通过上面的,我们可以发现,要为一个Writable实现一个Comparator,安装Hadoop的格式来,需要注意以下几点:
至于注册方面的知识,可以查看我的博客《Hadoop中Comparator原理》
Example:给上面自定义的CreateWritable类实现一个Comparator,也就是CreateWritable.Comparator类
CreateWritable.java
1 package cn.roboson.writable; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.util.Comparator; 7 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.io.WritableComparable; 10 import org.apache.hadoop.io.WritableComparator; 11 import org.apache.hadoop.io.WritableUtils; 12 /** 13 * 1.给定制的CreateWritable类实现一个Comparator,CreateWritable.Comparator 14 * @author roboson 15 * 16 */ 17 18 public class CreateWritable implements WritableComparable<CreateWritable>{ 19 20 private Text first; 21 private Text second; 22 23 //构造方法,这个是必须要的 24 public CreateWritable(){ 25 26 } 27 28 public CreateWritable(String first,String second){ 29 set(new Text(first), new Text(second)); 30 } 31 32 public CreateWritable(Text first,Text second){ 33 set(first,second); 34 } 35 36 37 38 public Text getFirst() { 39 return first; 40 } 41 42 public void setFirst(Text first) { 43 this.first = first; 44 } 45 46 public Text getSecond() { 47 return second; 48 } 49 50 public void setSecond(Text second) { 51 this.second = second; 52 } 53 54 public void set(Text first,Text second){ 55 this.first=first; 56 this.second=second; 57 } 58 59 60 @Override 61 public void readFields(DataInput in) throws IOException { 62 // TODO Auto-generated method stub 63 first.readFields(in); 64 second.readFields(in); 65 } 66 67 @Override 68 public void write(DataOutput out) throws IOException { 69 // TODO Auto-generated method stub 70 System.out.println(first); 71 first.write(out); 72 second.write(out); 73 } 74 75 @Override 76 public int compareTo(CreateWritable other) { 77 // TODO Auto-generated method stub 78 int cmp=first.compareTo(other.getFirst()); 79 if(cmp!=0){ 80 return cmp; 81 } 82 return second.compareTo(other.getSecond()); 83 } 84 85 @Override 86 public int hashCode() { 87 // TODO Auto-generated method stub 88 return first.hashCode()*163 + second.hashCode(); 89 } 90 91 @Override 92 public String toString() { 93 // TODO Auto-generated method stub 94 return first+"\t"+second; 95 } 96 97 @Override 98 public boolean equals(Object obj) { 99 // TODO Auto-generated method stub 100 if(obj instanceof CreateWritable){ 101 CreateWritable create = (CreateWritable) obj; 102 return first.equals(create.getFirst()) && second.equals(create.getSecond()); 103 } 104 return false; 105 } 106 107 108 //创建静态内部类:CreateWritable.Comparator 109 public static class Comparator extends WritableComparator{ 110 111 public Comparator() { 112 super(CreateWritable.class); 113 // TODO Auto-generated constructor stub 114 } 115 private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); 116 117 @Override 118 public int compare(byte[] b1, int s1, int l1, byte[] b2, 119 int s2, int l2) { 120 // TODO Auto-generated method stub 121 int firstL1 = 0,firstL2 = 0; 122 try { 123 firstL1 = WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1, s1); 124 firstL2 = WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2, s2); 125 int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); 126 if(cmp !=0){ 127 return cmp; 128 } 129 } catch (IOException e) { 130 // TODO Auto-generated catch block 131 e.printStackTrace(); 132 } 133 return TEXT_COMPARATOR.compare(b1, s1+firstL1, l1-firstL1, b2, s2+firstL2, l2-firstL2); 134 } 135 136 } 137 static{ 138 WritableComparator.define(CreateWritable.class,new Comparator()); 139 } 140 }
Writable07.java
1 package cn.roboson.writable; 2 3 import java.io.ByteArrayOutputStream; 4 import java.io.DataOutputStream; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.RawComparator; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.io.Writable; 10 import org.apache.hadoop.io.WritableComparator; 11 12 public class Writable07 { 13 14 public static void main(String[] args) throws IOException { 15 16 CreateWritable create01 = new CreateWritable(new Text("Hadoop"), new Text("1")); 17 CreateWritable create02 = new CreateWritable(new Text("Hadoop"), new Text("2")); 18 byte[] b1 = serialize(create01); 19 byte[] b2 = serialize(create02); 20 RawComparator<CreateWritable> comparator = WritableComparator.get(CreateWritable.class); 21 int cmp =comparator.compare(b1, 0, b1.length, b2, 0, b2.length); 22 if(cmp ==0){ 23 System.out.println("create01 == create02"); 24 }else if(cmp ==-1){ 25 System.out.println("create01<create02"); 26 }else{ 27 System.out.println("create01>create02"); 28 } 29 } 30 31 public static byte[] serialize(Writable writable) throws IOException{ 32 ByteArrayOutputStream out = new ByteArrayOutputStream(); 33 DataOutputStream dataOut = new DataOutputStream(out); 34 writable.write(dataOut); 35 return out.toByteArray(); 36 37 } 38 }
运行结果:
分析:
在上面的CreateWritable的内部类Comparator中,需要实现一个方法compare()方法,那么compare该如何实现,才能够对序列化数据流进行比较。
CreateWritable是由两个Text对象组成的(Text first, Text second),而Text对象的二进制表示,是一个长度可变的整数,包含字符串之UTF-8表示的字节数以及UTF-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个Text对象的字节表示有多长;然后将该长度传递给Text对象的RawComparator方法,最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的比较。
也就是说Text对象序列化后,也是由两部分组成,一部分是记录本身有多少个字节,另一部分就是它自己的字节数!
记录它本身有多少个字节所占用的字节长度:WritableUtils.decodeVintSize()方法返回一个整数,代表它的字节长度。
它自己本身的字节数:WritableComparator的readVInt()方法返回一个整数,代表它本身的字节长度。
这样就方便了,相比叫first的序列化数据流,根据结果,判断,看是否需要再比较second的序列化数据流。
标签:style blog http ar io color os 使用 sp
原文地址:http://www.cnblogs.com/robert-blue/p/4165602.html