标签:
在大型互联网应用架构中,通常由多台Memcache缓存服务器来构建Memcache集群,也叫做分布式Memcache。数据在写入缓存和从缓存中读取时,都会采用某中Hash算法,将数据Hash到某台具体的Memcache上,为了使应用在线的动态增加和移除Memcache服务器而不影响或很少影响其他已部署的Memcache服务器(也就是其他Memcache服务器中缓存的数据还能正常使用),这种Hash算法被称作为一致性Hash。
由一篇文章---Memcache分布式实现原理我们知道,Java_Memcahe支持一般Hash算法和实现应用中使用最广泛的一致性Hash算法,这两种算法都支持Server配置权重的虚拟节点Hash算法。下面我们由浅入深一一来探讨学习一下Java_Memcache的各种Hash 算法。
根据Memcache操作的步骤
SockIOPool sockpool= SockIOPool.getInstance(); //设置缓存服务器地址,可以设置多个实现分布式缓存 sockpool.setServers(new String[]{"127.0.0.1:11211","127.0.0.1:11212"}); //设置初始连接5 sockpool.setInitConn(5); ...... sockpool.initialize();//根据server以及一些配置信息初始化 memCache = new MemCachedClient(); //使用memcache操作数据memCache.get() & memCache.set()
在SockIOPool类初始化时会调用SchoonerSocketIOPool的PopulateBuckets方法,保存好Server信息,以便后续操作数据时使用。
private void populateBuckets() { buckets = new ArrayList(); for(int i = 0; i < servers.length; i++) { if(weights != null && weights.length > i) { for(int j = 0; j < weights[i].intValue(); j++) buckets.add(servers[i]); } else { buckets.add(servers[i]);//按Server的个数保存好Server地址及端口 } Object obj; if(authInfo != null) obj = new AuthSchoonerSockIOFactory(servers[i], isTcp, bufferSize, socketTO, socketConnectTO, nagle, authInfo); else obj = new SchoonerSockIOFactory(servers[i], isTcp, bufferSize, socketTO, socketConnectTO, nagle); GenericObjectPool genericobjectpool = new GenericObjectPool(((org.apache.commons.pool.PoolableObjectFactory) (obj)), maxConn, (byte)1, maxIdle, maxConn); ((SchoonerSockIOFactory) (obj)).setSockets(genericobjectpool); socketPool.put(servers[i], genericobjectpool);//保存好Server与对应的对象工厂,该工厂管理与Server建立Socket的连接 } }
在不考虑权重时,按Server的个数保存好Server信息,并保存与该server对应的对象工厂,用于管理与server建立Socket连接。
在使用MemcacheClient进行get/set 时实际操作的是AscIIClient的get/set操作,由Memcache分布式原理我们知道,get方法最后执行的是get(String s, String s1, Integer integer, boolean flag)方法。
public class AscIIClient extends MemCachedClient { private Object get(String s, String s1, Integer integer, boolean flag) { SchoonerSockIO schoonersockio; String s2; ...... s1 = sanitizeKey(s1);//UTF-8编码 schoonersockio = pool.getSock(s1, integer);//获取对应的memcache server socket,后续直接从该server上读取数据 ...... } }
然后我们再看看pool.getSocket
public class SchoonerSockIOPool { public final SchoonerSockIO getSock(String s, Integer integer) { ...... if(i == 1){ //这里作了一点优化,如果只有一台Memcache服务器,就直接拿List中的第一个Server,没必要后面去Hash SchoonerSockIO schoonersockio = hashingAlg != 3 ? getConnection((String)buckets.get(0)) : getConnection((String)consistentBuckets.get(consistentBuckets.firstKey())); return schoonersockio; } HashSet hashset = new HashSet(Arrays.asList(servers)); long l = getBucket(s, integer);//利用key--s的hashcode%Server个数,找到某个server在list中的位置 String s1 = hashingAlg != 3 ? (String)buckets.get((int)l) : (String)consistentBuckets.get(Long.valueOf(l)); do { if(hashset.isEmpty()) break; SchoonerSockIO schoonersockio1 = getConnection(s1);//根据Server IP和商品,建立Socket if(schoonersockio1 != null) return schoonersockio1; //返回Socket ...... } while(true); return null; } private final long getBucket(String s, Integer integer) { long l = getHash(s, integer); ...... long l1 = l % (long)buckets.size(); //利用HashCode % server个数 if(l1 < 0L) l1 *= -1L; return l1; } private final long getHash(String s, Integer integer) { if(integer != null) if(hashingAlg == 3) return integer.longValue() & 4294967295L; else return integer.longValue(); switch(hashingAlg) { case 0: // '\0' return (long)s.hashCode();//直接返回key的hashCode case 1: // '\001' return origCompatHashingAlg(s); case 2: // '\002' return newCompatHashingAlg(s); case 3: // '\003' return md5HashingAlg(s); } hashingAlg = 0; return (long)s.hashCode(); }
小结: 在初始化时将所有的Memcache Server存放在List中,后面操作数据时,根据数据的HashCode % server个数,计算出数据真正操作在哪台Server上,然后再建立与该Server的socket连接,操作数据。
问题:在site上,如果后期动态增加或移除Memcache server,数据的hashcode没有发生变化,但是由于server数据发生变化,取模运算的模变化了,计算的结果也会发生变化,所有Memcache Server上的数据都会失效,这对后台数据库来说简单就是灾难。
在上述算法中,利用取模运算,每台Memcache Server具体等同的概率被命中,但是在实际应用中,各个server的硬件,软件配置不同,都使用等同的命中率是不太合适的。例如,server A和C的可供Memcach使用内存都是2G,另外一台server B是8G,使用相同的命中率显然会浪费宝贵的内存资源。
为了解决这一问题,Memcache Client引入了权重的概念,算法会根据每台Server的权重来分配命中到该机器上的概率。例如:server A和C的权重都设置为2,server B的权重设置为8,那么A、C的命中率为2/12,B的命中率为8/12。话不多话,直接上代码。
private void populateBuckets() { buckets = new ArrayList(); for(int i = 0; i < servers.length; i++) { if(weights != null && weights.length > i) { //如果配置了权重,根据权重在list中保存server信息时增加虚拟server for(int j = 0; j < weights[i].intValue(); j++) buckets.add(servers[i]); } ......
结合前面的例子会更容易让人理解,在list中保存2次server A的信息,8次server B的信息,2次server C的信息,总共12个server。后面使用相同的Hash算法进行%12运算,根据概率的平均分布,list中每个server的概率都是1/12,但是其中2个server对应的是server A,8个server对应的是server B,2个server对应的是server C。
这样,根据各个server的权重就能决定server的数据命中率,以达到合理使用资源的要求了。
在SockIOPool初始化时根据配置的hash算法完成初始化, SocketIOPool中hashingAlg ==3表示一致性Hash.
public class SchoonerSockIOPool { private TreeMap consistentBuckets; ...... private void populateConsistentBuckets() { consistentBuckets = new TreeMap(); MessageDigest messagedigest = (MessageDigest)MD5.get(); if(totalWeight.intValue() <= 0 && weights != null) { for(int i = 0; i < weights.length; i++) { SchoonerSockIOPool schoonersockiopool = this; schoonersockiopool.totalWeight = Integer.valueOf(schoonersockiopool.totalWeight.intValue() + (weights[i] != null ? weights[i].intValue() : 1)); } } if(weights == null) totalWeight = Integer.valueOf(servers.length); for(int j = 0; j < servers.length; j++) { int k = 1; if(weights != null && weights[j] != null) k = weights[j].intValue(); double d = Math.floor((double)(40 * servers.length * k) / (double)totalWeight.intValue()); for(long l = 0L; (double)l < d; l++) { byte abyte0[] = messagedigest.digest((new StringBuilder()).append(servers[j]).append("-").append(l).toString().getBytes()); for(int i1 = 0; i1 < 4; i1++) { Long long1 = Long.valueOf((long)(abyte0[3 + i1 * 4] & 255) << 24 | (long)(abyte0[2 + i1 * 4] & 255) << 16 | (long)(abyte0[1 + i1 * 4] & 255) << 8 | (long)(abyte0[0 + i1 * 4] & 255)); consistentBuckets.put(long1, servers[j]); } } Object obj; if(authInfo != null) obj = new AuthSchoonerSockIOFactory(servers[j], isTcp, bufferSize, socketTO, socketConnectTO, nagle, authInfo); else obj = new SchoonerSockIOFactory(servers[j], isTcp, bufferSize, socketTO, socketConnectTO, nagle); GenericObjectPool genericobjectpool = new GenericObjectPool(((org.apache.commons.pool.PoolableObjectFactory) (obj)), maxConn, (byte)1, maxIdle, maxConn); ((SchoonerSockIOFactory) (obj)).setSockets(genericobjectpool); socketPool.put(servers[j], genericobjectpool); } }
上面的算法理解起来比较费力,先考虑只有没有配置权重的情况,算法简化为
for(int j = 0; j < servers.length; j++) { double d = 40; for(long l = 0L; (double)l < 40 ; l++) { byte abyte0[] = messagedigest.digest((new StringBuilder()).append(servers[j]).append("-").append(l).toString().getBytes()); for(int i1 = 0; i1 < 4; i1++) { Long long1 = Long.valueOf((long)(abyte0[3 + i1 * 4] & 255) << 24 | (long)(abyte0[2 + i1 * 4] & 255) << 16 | (long)(abyte0[1 + i1 * 4] & 255) << 8 | (long)(abyte0[0 + i1 * 4] & 255)); consistentBuckets.put(long1, servers[j]); } } }
首先利用server信息进行MD5加密产生字符数组,然后最内层的for循环将字符数据分割成4片,每一片转换为32位的long值
a[3] a[2] a[1] a[0]
a[7] a[6] a[5] a[4]
a[11] a[10] a[9] a[8]5
a[15] a[14] a[13] a[12]
归根到底是为了保证每个long值的唯一性。
每台server都会保存160份信息在Map中。
在使用MemcacheClient进行get/set 时实际操作的是AscIIClient的get/set操作,get方法最后执行的是get(String s, String s1, Integer integer, boolean flag)方法,先调用 pool.getSocket获取对应memache server的socket,然后再操作数据。
public class SchoonerSockIOPool { public final SchoonerSockIO getSock(String s, Integer integer) { ...... HashSet hashset = new HashSet(Arrays.asList(servers)); long l = getBucket(s, integer); String s1 = hashingAlg != 3 ? (String)buckets.get((int)l) : (String)consistentBuckets.get(Long.valueOf(l)); do { if(hashset.isEmpty()) break; SchoonerSockIO schoonersockio1 = getConnection(s1);//根据Server IP和商品,建立Socket if(schoonersockio1 != null) return schoonersockio1; //返回Socket ...... } while(true); return null; } private final long getBucket(String s, Integer integer) { long l = getHash(s, integer); if(hashingAlg == 3) return findPointFor(Long.valueOf(l)).longValue(); ...... } private final long getHash(String s, Integer integer) { ...... switch(hashingAlg) { ...... case 3: // '\003' return md5HashingAlg(s);//返回key的MD5 hashCode } hashingAlg = 0; return (long)s.hashCode(); } private static long md5HashingAlg(String s) { //根据key值进行MD5加密,取前32位作为hashCode MessageDigest messagedigest = (MessageDigest)MD5.get(); messagedigest.reset(); messagedigest.update(s.getBytes()); byte abyte0[] = messagedigest.digest(); long l = (long)(abyte0[3] & 255) << 24 | (long)(abyte0[2] & 255) << 16 | (long)(abyte0[1] & 255) << 8 | (long)(abyte0[0] & 255); //在初始化时也用到了该值,只不过初始化时计算MD5是用的server信息 return l; } private final Long findPointFor(Long long1) { SortedMap sortedmap = consistentBuckets.tailMap(long1); return sortedmap.isEmpty() ? (Long)consistentBuckets.firstKey() : (Long)sortedmap.firstKey(); }
这里需要了解一下tailMap API
翻译过来就是该方法返回Map中key值大于或等于参数的所有元素的视图,对该视图的修改都会反映到Map中,反之对Map的修改也会反映到视图中。
在找到Hash值大于key Hash值的所有server后,使用第一个server。整个处理流程可以表示为下图。
小结:一致性Hash算法首先利用server信息进行MD5 Hash,将server映射到32位的圆环上(只取32位),在操作数据时,用MD5对data进行Hash,将data映射到32位的圆环上(也取32位),顺时针找到第一个Hash值比data Hash值大的server。
需要注意的是,该算法也加入了虚拟节点的思想,每节server进行MD5 Hash映射到圆环上时都虚拟到了很多个节点,在本例中每台server虚拟出了160个节点,这样做是为了保证数据能较均匀的分布到各个server上。
采用MD5 对server和data进行Hash时不依赖于其他server,因此Hash值的计算是一致性的,而仅仅在将data映射到某台server上时取决于第一个Hash值大于或等于data Hash值的server,在动态增加和移除server时仅会影响该server右边直到逆时针下一个server之间的data,而其他区间的data不受影响,该算法被广泛应用中Memcache集群中。
对于一般Hash算法,一致性Hash算法还有一个很重要的区别:在没有配置权重时,一致性Hash算法也会增加虚拟节点。基于权重的一致性Hash算法也是为了保证数据的命中率。
结合一般Hash算法中的例子,Server A的权重是2,Server B的权重是8,Server C的权重是2,在初始化populateConsistentBuckets 中,Server A会分配80个虚拟节点,Server B会分配320个虚拟节点,Server C会分配80个虚拟节点。
通过对源码的分析,我们理清了Memcache 的一般Hash算法和一致性Hash算法,揭开了Memcache分布式算法的真实面目,在理解完算法后,我们是不是应该更加透澈地理解Memcache了呢?(写于2015-05-17)
标签:
原文地址:http://blog.csdn.net/musa875643dn/article/details/45796887