Memcache本身只是一个内存缓存服务器,用来缓存数据以缓解数据库压力,但是我们经常会听到分布工Memcache,那么它是如何实现的呢?在使用Java操作Memcache时,我们通常会借助Java_Memcache来帮助我们完成各项操作, get/set/delete等。下面我们阅读一下Java_Memcache的源码来窥探一二。(注:网上很难找到Java_Memcache最新的源代码,但是可以利用Java 反编译工具来查看其源码,具体操作步骤可以参见我的另一篇博客--Eclipse设置Java反编译)
首先,在应用程序初始化或第一次始用MemcachedClient时需要SockIOPool完成对Memcache客户端连接Memcache Server的配置参数初始化,类似于数据库连接池管理,主要包括IP&端口,连接数控制,连接空闲时间等。
SockIOPool sockpool= SockIOPool.getInstance(); //设置缓存服务器地址,可以设置多个实现分布式缓存 sockpool.setServers(new String[]{"127.0.0.1:11211","127.0.0.1:11212"}); //设置初始连接,最小最大连接 sockpool.setInitConn(5); sockpool.setMinConn(5); sockpool.setMaxConn(250); //设置每个连接最大空闲时间1个小时 sockpool.setMaxIdle(1000 * 3600); sockpool.setMaintSleep(30); sockpool.setNagle(false); sockpool.setSocketTO(3000); sockpool.setSocketConnectTO(0); sockpool.initialize(); 然后就可以获取MemCachedClient实例,调用对应的get/set方法。 MemCachedClient memCache = new MemCachedClient(); Object obj = memCache.get(key); //memCache.set(key, value);
public class SockIOPool { //注意,受保护的构造函数 protected SockIOPool() { poolMultiplier = 3; minConn = 5; maxConn = 100; maxIdle = 300000L; maxBusyTime = 30000L; socketTO = 3000; socketConnectTO = 3000; failover = true; failback = true; nagle = false; } public static SockIOPool getInstance() { SockIOPool sockiopool = new SockIOPool(); sockiopool.schoonerSockIOPool = SchoonerSockIOPool.getInstance("default"); return sockiopool; } public void initialize() { schoonerSockIOPool.initialize(); } public void setServers(String as[]) { schoonerSockIOPool.setServers(as); } public void setInitConn(int i) { schoonerSockIOPool.setInitConn(i); } ...... private int poolMultiplier; private int minConn; private int maxConn; private long maxIdle; private long maxBusyTime; private int socketTO; private int socketConnectTO; private boolean failover; private boolean failback; private boolean nagle; //连接信息都委托给schoonnerSockIOPool管理 private SchoonerSockIOPool schoonerSockIOPool; ...... }
public class SchoonerSockIOPool { protected SchoonerSockIOPool(boolean flag) { initialized = false; minConn = 8; maxConn = 32; maxBusyTime = 30000L; maintSleep = 30000L; socketTO = 30000; socketConnectTO = 3000; maxIdle = 1000L; aliveCheck = false; failover = true; failback = true; nagle = false; hashingAlg = 0; totalWeight = Integer.valueOf(0); bufferSize = 1049600; isTcp = flag;//true,这里是采用TCP和memcache进行通信 } public static SchoonerSockIOPool getInstance(String s) { synchronized(pools) { if(!pools.containsKey(s)) { SchoonerSockIOPool schoonersockiopool = new SchoonerSockIOPool(true); pools.putIfAbsent(s, schoonersockiopool); } } return (SchoonerSockIOPool)pools.get(s); } public final void setServers(String as[]) { servers = as; } public final void setInitConn(int i) { if(i < minConn) minConn = i; } ...... public void initialize() { initDeadLock.lock(); if(servers == null || servers.length <= 0) { if(log.isErrorEnabled()) log.error("++++ trying to initialize with no servers"); throw new IllegalStateException("++++ trying to initialize with no servers"); } socketPool = new HashMap(servers.length); hostDead = new ConcurrentHashMap(); hostDeadDur = new ConcurrentHashMap(); <strong>//注意,这里提供了一致性Hash和一般Hash算法,我们先分析一般Hash</strong> if(hashingAlg == 3) populateConsistentBuckets(); else populateBuckets(); initialized = true; initDeadLock.unlock(); ...... } ...... private static ConcurrentMap pools = new ConcurrentHashMap(); boolean initialized; private int minConn; private int maxConn; private long maxBusyTime; private long maintSleep; private int socketTO; private int socketConnectTO; private static int recBufferSize = 128; private long maxIdle; private boolean aliveCheck; private boolean failover; private boolean failback; private boolean nagle; private int hashingAlg; private final ReentrantLock initDeadLock = new ReentrantLock(); }
我们先分析一下一般Hash算法,关于一致性Hash算法和虚拟节点(利用权重)的问题,会另起一篇单独探讨,在这里不考虑Server权重的问题
SchoonerSockIOPool 类
private void populateBuckets() { buckets = new ArrayList(); for(int i = 0; i < servers.length; i++) { if(weights != null && weights.length > i){ for(int j = 0; j < weights[i].intValue(); j++) buckets.add(servers[i]); } else{ buckets.add(servers[i]); } Object obj; if(authInfo != null) obj = new AuthSchoonerSockIOFactory(servers[i], isTcp, bufferSize, socketTO, socketConnectTO, nagle, authInfo); else obj = new SchoonerSockIOFactory(servers[i], isTcp, bufferSize, socketTO, socketConnectTO, nagle); GenericObjectPool genericobjectpool = new GenericObjectPool(((org.apache.commons.pool.PoolableObjectFactory) (obj)), maxConn, (byte)1, maxIdle, maxConn); ((SchoonerSockIOFactory) (obj)).setSockets(genericobjectpool); socketPool.put(servers[i], genericobjectpool); } }
1.将server IP/port放入List
2.为每个server创建Socket工厂
3.为每个server创建对象池,以便管理socket连接,类似于数据库连接池、线程池
4.将server和对应的对象池放入map中
public class SchoonerSockIOFactory extends BasePoolableObjectFactory { public SchoonerSockIOFactory(String s, boolean flag, int i, int j, int k, boolean flag1) { host = s; isTcp = flag; bufferSize = i; socketTO = j; socketConnectTO = k; nagle = flag1; } public Object makeObject()throws Exception { SchoonerSockIO schoonersockio = createSocket(host); return schoonersockio; } ...... protected final SchoonerSockIO createSocket(String s) throws Exception { Object obj = null; if(isTcp) obj = new SchoonerSockIOPool.TCPSockIO(sockets, s, bufferSize, socketTO, socketConnectTO, nagle); else obj = new SchoonerSockIOPool.UDPSockIO(sockets, s, bufferSize, socketTO); return ((SchoonerSockIO) (obj)); } <strong>//工厂持有对象池的引用</strong> public void setSockets(GenericObjectPool genericobjectpool){ sockets = genericobjectpool; } protected GenericObjectPool sockets; protected String host; protected int bufferSize; protected int socketTO; protected int socketConnectTO; protected boolean isTcp; protected boolean nagle; }
package org.apache.commons.pool.impl; public class GenericObjectPool extends BaseObjectPool implements ObjectPool
{ ...... public GenericObjectPool(PoolableObjectFactory factory, int maxActive, byte whenExhaustedAction, long maxWait, int maxIdle){ this(factory, maxActive, whenExhaustedAction, maxWait, maxIdle, 0, false, false, -1L, 3, 1800000L, false); } ...... public GenericObjectPool(PoolableObjectFactory factory, int maxActive, byte whenExhaustedAction, long maxWait, int maxIdle, int minIdle, boolean testOnBorrow, boolean testOnReturn, long timeBetweenEvictionRunsMillis, int numTestsPerEvictionRun, long minEvictableIdleTimeMillis, boolean testWhileIdle, long softMinEvictableIdleTimeMillis, boolean lifo){ ...... _allocationQueue = new LinkedList(); _factory = factory; _maxActive = maxActive; _lifo = lifo; switch(whenExhaustedAction) { case 0: // '\0' case 1: // '\001' case 2: // '\002' _whenExhaustedAction = whenExhaustedAction; break; default: throw new IllegalArgumentException("whenExhaustedAction " + whenExhaustedAction + " not recognized."); } ...... _pool = new CursorableLinkedList(); startEvictor(_timeBetweenEvictionRunsMillis); } protected synchronized void startEvictor(long delay) { if(null != _evictor){ EvictionTimer.cancel(_evictor); _evictor = null; } if(delay > 0L){ _evictor = new Evictor(); EvictionTimer.schedule(_evictor, delay, delay); } }
对象池自身有单独的线程来维护对象池。关于池的知识,后续单独分析。
public class MemCachedClient { public MemCachedClient() { this(((String) (null)), true, false); } public MemCachedClient(String s, boolean flag, boolean flag1) { BLAND_DATA_SIZE = " ".getBytes(); if(flag1) client = new BinaryClient(s); else client = ((MemCachedClient) (flag ? ((MemCachedClient) (new AscIIClient(s))) : ((MemCachedClient) (new AscIIUDPClient(s))))); } ...... MemCachedClient client; }
根据前面MemCachedClient操作步骤,我们知道这里创建的是AscIIClinet实例
public class AscIIClient extends MemCachedClient { public AscIIClient(String s) { super((MemCachedClient)null); transCoder = new ObjectTransCoder(); poolName = s; init(); } private void init() { sanitizeKeys = true; primitiveAsString = false; compressEnable = false; compressThreshold = 30720L; defaultEncoding = "UTF-8"; poolName = poolName != null ? poolName : "default"; pool = SchoonerSockIOPool.getInstance(poolName); } ...... }
MemCachedClient操作数据的方法很多,有get/set/add/replace/gets/delete/addOrIncr等,还有一系列的重载方法,我们在此不一一分析,仅分析一下get/set方法,就足以深入学习和探讨MemCachedClient的实现机制,其他方法基本类似。
public boolean set(String s, Object obj) { return client.set(s, obj); } ...... public Object get(String s) { return client.get(s); }
public boolean set(String s, Object obj) { return client.set(s, obj); }
实际操作的是AScIIClient的set方法
public class AscIIClient extends MemCachedClient { public boolean set(String s, Object obj) { return set("set", s, obj, null, null, Long.valueOf(0L), primitiveAsString); } private boolean set(String s, String s1, Object obj, Date date, Integer integer, Long long1, boolean flag) { SchoonerSockIO schoonersockio; int i; String s2; ...... try { s1 = sanitizeKey(s1);//UTF-8编码 } catch(UnsupportedEncodingException unsupportedencodingexception) { ...... } ...... schoonersockio = pool.getSock(s1, integer);//利用Key进行Hash,获取对应的Server ...... if(date == null) date = new Date(0L); i = NativeHandler.getMarkerFlag(obj); s2 = s + " " + s1 + " " + i + " " + date.getTime() / 1000L + " "; boolean flag1; schoonersockio.writeBuf.clear(); schoonersockio.writeBuf.put(s2.getBytes()); int j = schoonersockio.writeBuf.position(); schoonersockio.writeBuf.put(BLAND_DATA_SIZE); if(long1.longValue() != 0L) schoonersockio.writeBuf.put((new StringBuilder()).append(" ").append(long1.toString()).toString().getBytes()); schoonersockio.writeBuf.put(B_RETURN); SockOutputStream sockoutputstream = new SockOutputStream(schoonersockio); int k = 0; if(i != 0) { byte abyte0[]; if(flag) abyte0 = obj.toString().getBytes(defaultEncoding); else abyte0 = NativeHandler.encode(obj); sockoutputstream.write(abyte0); k = abyte0.length; } else { k = transCoder.encode(sockoutputstream, obj); } schoonersockio.writeBuf.put(B_RETURN); byte abyte1[] = (new Integer(k)).toString().getBytes(); int l = schoonersockio.writeBuf.position(); schoonersockio.writeBuf.position(j); schoonersockio.writeBuf.put(abyte1); schoonersockio.writeBuf.position(l); schoonersockio.flush(); String s3 = (new SockInputStream(schoonersockio, 2147483647)).getLine(); if(!"STORED\r\n".equals(s3)) break MISSING_BLOCK_LABEL_538; flag1 = true; if(schoonersockio != null) { schoonersockio.close(); schoonersockio = null; } ...... }
public final SchoonerSockIO getSock(String s, Integer integer) { ...... if(i == 1) { //如果只有一个server,直接建立与该Server的connection SchoonerSockIO schoonersockio = hashingAlg != 3 ? getConnection((String)buckets.get(0)) : getConnection((String)consistentBuckets.get(consistentBuckets.firstKey())); return schoonersockio; } HashSet hashset = new HashSet(Arrays.asList(servers)); long l = getBucket(s, integer);//利用Hahs算法找到对应的Server String s1 = hashingAlg != 3 ? (String)buckets.get((int)l) : (String)consistentBuckets.get(Long.valueOf(l)); do{ if(hashset.isEmpty()) break; SchoonerSockIO schoonersockio1 = getConnection(s1);//建立该Server的Connection if(schoonersockio1 != null) return schoonersockio1; ...... } } public final SchoonerSockIO getConnection(String s) { ...... GenericObjectPool genericobjectpool = (GenericObjectPool)socketPool.get(s);//获取ObjectPool SchoonerSockIO schoonersockio; try{ schoonersockio = (SchoonerSockIO)genericobjectpool.borrowObject();//通过池来获取连接 }catch(Exception exception){ schoonersockio = null; } ...... return schoonersockio; }
public final class SockOutputStream extends OutputStream { public final void write(byte abyte0[])throws IOException { write(abyte0, 0, abyte0.length); } public final void write(byte abyte0[], int i, int j) throws IOException { <strong>//将字符数组写入缓冲区</strong> if(j == 0) return; if(sock.writeBuf.remaining() >= j) { sock.writeBuf.put(abyte0, i, j); } else { int k = 0; boolean flag = false; for(int i1 = 0; (i1 = j - k) > 0;) { int l = sock.writeBuf.remaining(); l = l >= i1 ? i1 : l; if(l == 0) writeToChannel(); else sock.writeBuf.put(abyte0, i, l); k += l; } } count += j; }
实际操作的是AscIIClient的get方法
public class AscIIClient extends MemCachedClient { public Object get(String s) { return get(s, null); } ...... private Object get(String s, String s1, Integer integer, boolean flag) { SchoonerSockIO schoonersockio; String s2; try{ s1 = sanitizeKey(s1<strong>);//UTF-8编码</strong> } ...... schoonersockio = pool.getSock(s1, integer<strong>);//与set方法类似,利用Hash算法建立与server的socket连接</strong> ...... schoonersockio.writeBuf.clear(); schoonersockio.writeBuf.put(s2.getBytes()); schoonersockio.writeBuf.put(B_RETURN); schoonersockio.flush(); i = 0; j = 0; sockinputstream = new SockInputStream(schoonersockio, 2147483647); ...... byte abyte0[] = sockinputstream.getBuffer(); if((j & 2) == 2){ GZIPInputStream gzipinputstream = new GZIPInputStream(new ByteArrayInputStream(abyte0)); ByteArrayOutputStream bytearrayoutputstream = new ByteArrayOutputStream(abyte0.length); byte abyte1[] = new byte[2048]; int i1; while((i1 = gzipinputstream.read(abyte1)) != -1) bytearrayoutputstream.write(abyte1, 0, i1); abyte0 = bytearrayoutputstream.toByteArray(); gzipinputstream.close(); } if(primitiveAsString || flag) obj1 = new String(abyte0, defaultEncoding); else obj1 = NativeHandler.decode(abyte0, j); .... obj3 = obj1; return obj3; }
总结:本文概要地描述了Memcache的初始化以及对数据的get/set操作流程,中间涉及到利用Key值进行Hash算法获取对应的Server,根据Server对应的对象池获取Socket连接以及SocketInputStream SocketOutputSteam操作数据。
在接下来的系列文章里,会深入研究各个部分的细节。(2015-05-12)
ps:如果您觉得本文对您的学习有所帮助,请多多支持。
Memcache分布式实现原理---Java_Memcache 源码分析
原文地址:http://blog.csdn.net/musa875643dn/article/details/45675711