标签:style blog http io color ar os java sp
以下文章取自:http://jameswxx.iteye.com/blog/1168711
memcached的java客户端有好几种,http://code.google.com/p/memcached/wiki/Clients 罗列了以下几种
- spymemcached
-
- * http://www.couchbase.org/code/couchbase/java
- o An improved Java API maintained by Matt Ingenthron and others at Couchbase.
- o Aggressively optimised, ability to run async, supports binary protocol, support Membase and Couchbase features, etc. See site for details.
-
- Java memcached client
-
- * http://www.whalin.com/memcached
- o A Java API is maintained by Greg Whalin from Meetup.com.
-
- More Java memcached clients
-
- * http://code.google.com/p/javamemcachedclient
- * http://code.google.com/p/memcache-client-forjava
- * http://code.google.com/p/xmemcached
-
- Integrations
-
- * http://code.google.com/p/simple-spring-memcached
- * http://code.google.com/p/memcached-session-manager
我看的是第二个:Java memcached client源码,代码很简洁,一共只有9个类,最主要的有以下三个
MemcachedClient.java 客户端,负责提供外出程序接口,如get/set方法等等
SockIOPool.java 一个自平衡的连接池
NativeHandler.java 负责部分数据类型的序列化
它包含以下几个部分
1:key的服务端分布
2:数据序列化和压缩
3:连接池(连接方式和池的动态自动平衡)
4:failover和failback机制
5:和memcached服务器的通讯协议
关于这几个点,我从key的set/get说起,会贯穿上面列举的4个部分。这个文章写下来,本来是作为一个笔记,思维比较跳跃,可能不是很连贯,如有疑问,欢迎站内交流。这个client的代码
很简洁明了,我也没有加过多注释,只是理了一个脉络。
从客户端自带的测试代码开始
- package com.meetup.memcached.test;
- import com.meetup.memcached.*;
- import org.apache.log4j.*;
-
- public class TestMemcached {
- public static void main(String[] args) {
- BasicConfigurator.configure();
- String[] servers = { "127.0.0.1:12000"};
- SockIOPool pool = SockIOPool.getInstance();
- pool.setServers( servers );
- pool.setFailover( true );
- pool.setInitConn( 10 );
- pool.setMinConn( 5 );
- pool.setMaxConn( 250 );
- pool.setMaintSleep( 30 );
- pool.setNagle( false );
- pool.setSocketTO( 3000 );
- pool.setAliveCheck( true );
-
- pool.initialize();
-
- MemcachedClient mcc = new MemcachedClient();
-
-
- com.meetup.memcached.Logger.getLogger( MemcachedClient.class.getName() ).setLevel( com.meetup.memcached.Logger.LEVEL_WARN );
-
- for ( int i = 0; i < 10; i++ ) {
- boolean success = mcc.set( "" + i, "Hello!" );
- String result = (String)mcc.get( "" + i );
- System.out.println( String.format( "set( %d ): %s", i, success ) );
- System.out.println( String.format( "get( %d ): %s", i, result ) );
- }
-
- System.out.println( "\n\t -- sleeping --\n" );
- try { Thread.sleep( 10000 ); } catch ( Exception ex ) { }
-
- for ( int i = 0; i < 10; i++ ) {
- boolean success = mcc.set( "" + i, "Hello!" );
- String result = (String)mcc.get( "" + i );
- System.out.println( String.format( "set( %d ): %s", i, success ) );
- System.out.println( String.format( "get( %d ): %s", i, result ) );
- }
- }
- }
以上代码大概做了这几件事情:
初始化一个连接池
新建一个memcached客户端
set一个key/value
get一个key,并且打印出value
这是我们实际应用中很常见的场景。
连接池的创建和初始化
连接池SockIOPool是非常重要的部分,它的好坏直接决定了客户端的性能。SockIOPool用一个HashMap持有多个连接池对象,连接池以名称作为标识,默认为"default"。看看
SockIOPool的getInstance方法就知道了。
- public static SockIOPool getInstance() {
- return getInstance("default");
- }
-
- public static synchronized SockIOPool getInstance(String poolName) {
- if (pools.containsKey(poolName)) return pools.get(poolName);
-
- SockIOPool pool = new SockIOPool();
- pools.put(poolName, pool);
-
- return pool;
- }
连接池实例化完成后,还需要初始化,看看pool.initialize()做了什么:
-
-
- public void initialize() {
-
- synchronized (this) {
-
- if (initialized && (buckets != null || consistentBuckets != null) && (availPool != null)&& (busyPool != null)) {
- log.error("++++ trying to initialize an already initialized pool");
- return;
- }
- <span style="color: #ff0000;">
- availPool = new HashMap<String, Map<SockIO, Long>>(servers.length * initConn);
-
- busyPool = new HashMap<String, Map<SockIO, Long>>(servers.length * initConn);
-
- deadPool = new IdentityHashMap<SockIO, Integer>();
- hostDeadDur = new HashMap<String, Long>();
- hostDead = new HashMap<String, Date>();
- maxCreate = (poolMultiplier > minConn) ? minConn : minConn / poolMultiplier;
- if (log.isDebugEnabled()) {
- log.debug("++++ initializing pool with following settings:");
- log.debug("++++ initial size: " + initConn);
- log.debug("++++ min spare : " + minConn);
- log.debug("++++ max spare : " + maxConn);
- }
- if (servers == null || servers.length <= 0) {
- log.error("++++ trying to initialize with no servers");
- throw new IllegalStateException("++++ trying to initialize with no servers");
- }
-
- if (this.hashingAlg == CONSISTENT_HASH) populateConsistentBuckets();
- else populateBuckets();
-
- this.initialized = true;
-
- if (this.maintSleep > 0) this.startMaintThread();
- }
- }
连接池的关闭
很简单,只是重置清空相关参数而已
- public void shutDown() {
- synchronized (this) {
- if (log.isDebugEnabled()) log.debug("++++ SockIOPool shutting down...");
-
- if (maintThread != null && maintThread.isRunning()) {
-
- stopMaintThread();
-
-
- while (maintThread.isRunning()) {
- if (log.isDebugEnabled()) log.debug("++++ waiting for main thread to finish run +++");
- try {
- Thread.sleep(500);
- } catch (Exception ex) {
- }
- }
- }
-
- if (log.isDebugEnabled()) log.debug("++++ closing all internal pools.");
- closePool(availPool);
- closePool(busyPool);
- availPool = null;
- busyPool = null;
- buckets = null;
- consistentBuckets = null;
- hostDeadDur = null;
- hostDead = null;
- maintThread = null;
- initialized = false;
- if (log.isDebugEnabled()) log.debug("++++ SockIOPool finished shutting down.");
- }
- }
连接池的自动平衡
SockIOPool的initialize()方法最后有这么一行代码
// start maint thread
if (this.maintSleep > 0) this.startMaintThread();
这是在初始化完成后,启动线程池平衡线程
- protected void startMaintThread() {
- if (maintThread != null) {
- if (maintThread.isRunning()) {
- log.error("main thread already running");
- } else {
- maintThread.start();
- }
- } else {
- maintThread = new MaintThread(this);
- maintThread.setInterval(this.maintSleep);
- maintThread.start();
- }
- }
MaintThread的run方法
- public void run() {
- this.running = true;
- while (!this.stopThread) {
- try {
- Thread.sleep(interval);
-
-
- if (pool.isInitialized()) pool.selfMaint();
- } catch (Exception e) {
- break;
- }
- }
- this.running = false;
其实最终的平衡方法是SockIOPool.selfMaint()
- protected void selfMaint() {
- if (log.isDebugEnabled()) log.debug("++++ Starting self maintenance....");
-
-
-
- Map<String, Integer> needSockets = new HashMap<String, Integer>();
-
- synchronized (this) {
-
- for (Iterator<String> i = availPool.keySet().iterator(); i.hasNext();) {
- String host = i.next();
- Map<SockIO, Long> sockets = availPool.get(host);
-
- if (log.isDebugEnabled()) log.debug("++++ Size of avail pool for host (" + host + ") = "
- + sockets.size());
-
-
- if (sockets.size() < minConn) {
-
- int need = minConn - sockets.size();
- needSockets.put(host, need);
- }
- }
- }
-
-
- Map<String, Set<SockIO>> newSockets = new HashMap<String, Set<SockIO>>();
-
- for (String host : needSockets.keySet()) {
- Integer need = needSockets.get(host);
-
- if (log.isDebugEnabled()) log.debug("++++ Need to create " + need + " new sockets for pool for host: "
- + host);
-
- Set<SockIO> newSock = new HashSet<SockIO>(need);
- for (int j = 0; j < need; j++) {
- SockIO socket = createSocket(host);
- if (socket == null) break;
- newSock.add(socket);
- }
- newSockets.put(host, newSock);
- }
-
-
-
-
-
- synchronized (this) {
-
- for (String host : newSockets.keySet()) {
- Set<SockIO> sockets = newSockets.get(host);
- for (SockIO socket : sockets)
- addSocketToPool(availPool, host, socket);
- }
-
- for (Iterator<String> i = availPool.keySet().iterator(); i.hasNext();) {
- String host = i.next();
- Map<SockIO, Long> sockets = availPool.get(host);
- if (log.isDebugEnabled()) log.debug("++++ Size of avail pool for host (" + host + ") = "
- + sockets.size());
-
-
- if (sockets.size() > maxConn) {
-
- int diff = sockets.size() - maxConn;
- int needToClose = (diff <= poolMultiplier) ? diff : (diff) / poolMultiplier;
-
- if (log.isDebugEnabled()) log.debug("++++ need to remove " + needToClose
- + " spare sockets for pool for host: " + host);
-
- for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {
- if (needToClose <= 0) break;
-
-
- SockIO socket = j.next();
- long expire = sockets.get(socket).longValue();
-
-
- 明,该连接在可用连接池呆得太久了,需要回收
- if ((expire + maxIdle) < System.currentTimeMillis()) {
- if (log.isDebugEnabled()) log.debug("+++ removing stale entry from pool as it is past its idle timeout and pool is over max spare");
-
-
- deadPool.put(socket, ZERO);
- j.remove();
- needToClose--;
- }
- }
- }
- }
-
-
- for (Iterator<String> i = busyPool.keySet().iterator(); i.hasNext();) {
- String host = i.next();
- Map<SockIO, Long> sockets = busyPool.get(host);
- if (log.isDebugEnabled()) log.debug("++++ Size of busy pool for host (" + host + ") = "
- + sockets.size());
-
- for (Iterator<SockIO> j = sockets.keySet().iterator(); j.hasNext();) {
-
- SockIO socket = j.next();
- long hungTime = sockets.get(socket).longValue();
-
- if ((hungTime + maxBusyTime) < System.currentTimeMillis()) {
- log.error("+++ removing potentially hung connection from busy pool ... socket in pool for "
- + (System.currentTimeMillis() - hungTime) + "ms");
-
-
- deadPool.put(socket, ZERO);
- j.remove();
- }
- }
- }
- }
-
-
- Set<SockIO> toClose;
- synchronized (deadPool) {
- toClose = deadPool.keySet();
- deadPool = new IdentityHashMap<SockIO, Integer>();
- }
-
- for (SockIO socket : toClose) {
- try {
- socket.trueClose(false);
- } catch (Exception ex) {
- log.error("++++ failed to close SockIO obj from deadPool");
- log.error(ex.getMessage(), ex);
- }
-
- socket = null;
- }
-
- if (log.isDebugEnabled()) log.debug("+++ ending self maintenance.");
- }
key的服务器端分布
初始化方法其实就是根据每个服务器的权重,建立一个服务器地址集合,如果选择了一致性哈希,则对服务器地址进行一致性哈希分布,一致性哈希算法比较简单,如果不了解的同学,可以
自行google一下,initialize() 方法里有这段代码:
//一致性哈希
- if (this.hashingAlg == CONSISTENT_HASH){
- populateConsistentBuckets();
- }else populateBuckets();
看看populateConsistentBuckets()方法
// 用一致性哈希算法将服务器分布在一个2的32次方的环里,服务器的分布位置<=servers.length*40*4
- private void populateConsistentBuckets() {
- if (log.isDebugEnabled()) log.debug("++++ initializing internal hashing structure for consistent hashing");
-
-
- this.consistentBuckets = new TreeMap<Long, String>();
- MessageDigest md5 = MD5.get();
- if (this.totalWeight <= 0 && this.weights != null) {
- for (int i = 0; i < this.weights.length; i++)
- this.totalWeight += (this.weights[i] == null) ? 1 : this.weights[i];
- } else if (this.weights == null) {
- this.totalWeight = this.servers.length;
- }
-
- for (int i = 0; i < servers.length; i++) {
- int thisWeight = 1;
- if (this.weights != null && this.weights[i] != null) thisWeight = this.weights[i];
-
-
- double factor = Math.floor(((double) (40 * this.servers.length * thisWeight)) / (double) this.totalWeight);
-
-
- for (long j = 0; j < factor; j++) {
-
- byte[] d = md5.digest((servers[i] + "-" + j).getBytes());
-
- for (int h = 0; h < 4; h++) {
- Long k = ((long) (d[3 + h * 4] & 0xFF) << 24) | ((long) (d[2 + h * 4] & 0xFF) << 16)
- | ((long) (d[1 + h * 4] & 0xFF) << 8) | ((long) (d[0 + h * 4] & 0xFF));
- consistentBuckets.put(k, servers[i]);
- if (log.isDebugEnabled()) log.debug("++++ added " + servers[i] + " to server bucket");
- }
- }
-
-
- if (log.isDebugEnabled()) log.debug("+++ creating initial connections (" + initConn + ") for host: "
- + servers[i]);
-
-
- for (int j = 0; j < initConn; j++) {
- SockIO socket = createSocket(servers[i]);
- if (socket == null) {
- log.error("++++ failed to create connection to: " + servers[i] + " -- only " + j + " created.");
- break;
- }
-
-
- addSocketToPool(availPool, servers[i], socket);
- if (log.isDebugEnabled()) log.debug("++++ created and added socket: " + socket.toString()
- + " for host " + servers[i]);
- }
- }
如果不是一致性哈希,则只是普通分布,很简单,只是根据权重将服务器地址放入buckets这个List里
- private void populateBuckets() {
- if (log.isDebugEnabled()) log.debug("++++ initializing internal hashing structure for consistent hashing");
-
-
- this.buckets = new ArrayList<String>();
-
- for (int i = 0; i < servers.length; i++) {
- if (this.weights != null && this.weights.length > i) {
- for (int k = 0; k < this.weights[i].intValue(); k++) {
- this.buckets.add(servers[i]);
- if (log.isDebugEnabled()) log.debug("++++ added " + servers[i] + " to server bucket");
- }
- } else {
- this.buckets.add(servers[i]);
- if (log.isDebugEnabled()) log.debug("++++ added " + servers[i] + " to server bucket");
- }
-
-
- if (log.isDebugEnabled()) log.debug("+++ creating initial connections (" + initConn + ") for host: "
- + servers[i]);
-
- for (int j = 0; j < initConn; j++) {
- SockIO socket = createSocket(servers[i]);
- if (socket == null) {
- log.error("++++ failed to create connection to: " + servers[i] + " -- only " + j + " created.");
- break;
- }
-
-
- addSocketToPool(availPool, servers[i], socket);
- if (log.isDebugEnabled()) log.debug("++++ created and added socket: " + socket.toString()
- + " for host " + servers[i]);
- }
- }
- }
如何创建socket连接
在上面的private void populateBuckets()方法里,createSocket(servers[i])是创建到服务器的连接,看看这个方法
- protected SockIO createSocket(String host) {
- SockIO socket = null;
-
-
- private final ReentrantLock hostDeadLock = new ReentrantLock();
- hostDeadLock.lock();
- try {
-
-
-
-
- if (failover && failback && hostDead.containsKey(host) && hostDeadDur.containsKey(host)) {
- Date store = hostDead.get(host);
- long expire = hostDeadDur.get(host).longValue();
-
- if ((store.getTime() + expire) > System.currentTimeMillis()) return null;
- }
- } finally {
- hostDeadLock.unlock();
- }
-
-
- try {
- socket = new SockIO(this, host, this.socketTO, this.socketConnectTO, this.nagle);
- if (!socket.isConnected()) {
- log.error("++++ failed to get SockIO obj for: " + host + " -- new socket is not connected");
- deadPool.put(socket, ZERO);
- socket = null;
- }
- } catch (Exception ex) {
- log.error("++++ failed to get SockIO obj for: " + host);
- log.error(ex.getMessage(), ex);
- socket = null;
- }
-
-
-
- hostDeadLock.lock();
- try {
-
- if (socket == null) {
- Date now = new Date();
- hostDead.put(host, now);
-
-
- long expire = (hostDeadDur.containsKey(host)) ? (((Long) hostDeadDur.get(host)).longValue() * 2) : 1000;
-
- if (expire > MAX_RETRY_DELAY) expire = MAX_RETRY_DELAY;
-
- hostDeadDur.put(host, new Long(expire));
- if (log.isDebugEnabled()) log.debug("++++ ignoring dead host: " + host + " for " + expire + " ms");
-
-
- clearHostFromPool(availPool, host);
- } else {
- if (log.isDebugEnabled()) log.debug("++++ created socket (" + socket.toString() + ") for host: " + host);
-
- if (hostDead.containsKey(host) || hostDeadDur.containsKey(host)) {
- hostDead.remove(host);
- hostDeadDur.remove(host);
- }
- }
- } finally {
- hostDeadLock.unlock();
- }
-
- return socket;
- }
SockIO构造函数
- public SockIO(SockIOPool pool, String host, int timeout, int connectTimeout, boolean noDelay)
- throws IOException,
- UnknownHostException {
- this.pool = pool;
- String[] ip = host.split(":");
-
- sock = getSocket(ip[0], Integer.parseInt(ip[1]), connectTimeout);
- if (timeout >= 0) this.sock.setSoTimeout(timeout);
-
- sock.setTcpNoDelay(noDelay);
-
- in = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
- out = new BufferedOutputStream(sock.getOutputStream());
- this.host = host;
- }
getSocket方法
- protected static Socket getSocket(String host, int port, int timeout) throws IOException {
- SocketChannel sock = SocketChannel.open();
- sock.socket().connect(new InetSocketAddress(host, port), timeout);
- return sock.socket();
- }
可以看到,socket连接是用nio方式创建的。
新建MemcachedClient
MemcachedClient mcc = new MemcachedClient();新建了一个memcached客户端,看看构造函数,没作什么,只是设置参数而已。
- public MemcachedClient() {
- init();
- }
-
-
- private void init() {
- this.sanitizeKeys = true;
- this.primitiveAsString = false;
- this.compressEnable = true;
- this.compressThreshold = COMPRESS_THRESH;
- this.defaultEncoding = "UTF-8";
- this.poolName = ( this.poolName == null ) ? "default" : this.poolName;
-
-
- this.pool = SockIOPool.getInstance( poolName );
- }
set方法如何工作
到此memcached客户端初始化工作完成。再回到测试类TestMemcached,看看for循环里的
boolean success = mcc.set( "" + i, "Hello!" );
String result = (String)mcc.get( "" + i );
初始化后,就可以set,get了。看看set是怎么工作的。
- public boolean set( String key, Object value ) {
- return set( "set", key, value, null, null, primitiveAsString );
- }
-
-
-
- private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) {
- if ( cmdname == null || cmdname.trim().equals( "" ) || key == null ) {
- log.error( "key is null or cmd is null/empty for set()" );
- return false;
- }
-
- try {
- key = sanitizeKey( key );
- }
- catch ( UnsupportedEncodingException e ) {
-
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
- log.error( "failed to sanitize your key!", e );
- return false;
- }
-
- if ( value == null ) {
- log.error( "trying to store a null value to cache" );
- return false;
- }
-
-
- SockIOPool.SockIO sock = pool.getSock( key, hashCode );
-
- if ( sock == null ) {
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key );
- return false;
- }
-
- if ( expiry == null )
- expiry = new Date(0);
-
-
- int flags = 0;
-
-
- byte[] val;
-
-
- if ( NativeHandler.isHandled( value ) ) {
- if ( asString ) {
-
- try {
- if ( log.isInfoEnabled() )
- log.info( "++++ storing data as a string for key: " + key + " for class: " + value.getClass().getName() );
- val = value.toString().getBytes( defaultEncoding );
- }
- catch ( UnsupportedEncodingException ue ) {
-
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, ue, key );
- log.error( "invalid encoding type used: " + defaultEncoding, ue );
- sock.close();
- sock = null;
- return false;
- }
- }
- else {
- try {
- if ( log.isInfoEnabled() )
- log.info( "Storing with native handler..." );
- flags |= NativeHandler.getMarkerFlag( value );
- val = NativeHandler.encode( value );
- }
- catch ( Exception e ) {
-
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
- log.error( "Failed to native handle obj", e );
-
- sock.close();
- sock = null;
- return false;
- }
- }
- }
- else {
-
- try {
- if ( log.isInfoEnabled() )
- log.info( "++++ serializing for key: " + key + " for class: " + value.getClass().getName() );
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- (new ObjectOutputStream( bos )).writeObject( value );
- val = bos.toByteArray();
- flags |= F_SERIALIZED;
- }
- catch ( IOException e ) {
-
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
-
-
-
- log.error( "failed to serialize obj", e );
- log.error( value.toString() );
-
-
- sock.close();
- sock = null;
- return false;
- }
- }
-
-
- if ( compressEnable && val.length > compressThreshold ) {
- try {
- if ( log.isInfoEnabled() ) {
- log.info( "++++ trying to compress data" );
- log.info( "++++ size prior to compression: " + val.length );
- }
- ByteArrayOutputStream bos = new ByteArrayOutputStream( val.length );
- GZIPOutputStream gos = new GZIPOutputStream( bos );
- gos.write( val, 0, val.length );
- gos.finish();
- gos.close();
-
-
- val = bos.toByteArray();
- flags |= F_COMPRESSED;
-
- if ( log.isInfoEnabled() )
- log.info( "++++ compression succeeded, size after: " + val.length );
- }
- catch ( IOException e ) {
-
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
- log.error( "IOException while compressing stream: " + e.getMessage() );
- log.error( "storing data uncompressed" );
- }
- }
-
-
- try {
-
- String cmd = String.format( "%s %s %d %d %d\r\n", cmdname, key, flags, (expiry.getTime() / 1000), val.length );
- sock.write( cmd.getBytes() );
- sock.write( val );
- sock.write( "\r\n".getBytes() );
- sock.flush();
-
-
- String line = sock.readLine();
- if ( log.isInfoEnabled() )
- log.info( "++++ memcache cmd (result code): " + cmd + " (" + line + ")" );
-
- if ( STORED.equals( line ) ) {
- if ( log.isInfoEnabled() )
- log.info("++++ data successfully stored for key: " + key );
- sock.close();
- sock = null;
- return true;
- }
- else if ( NOTSTORED.equals( line ) ) {
- if ( log.isInfoEnabled() )
- log.info( "++++ data not stored in cache for key: " + key );
- }
- else {
- log.error( "++++ error storing data in cache for key: " + key + " -- length: " + val.length );
- log.error( "++++ server response: " + line );
- }
- }
- catch ( IOException e ) {
-
-
- if ( errorHandler != null )
- errorHandler.handleErrorOnSet( this, e, key );
-
-
- log.error( "++++ exception thrown while writing bytes to server on set" );
- log.error( e.getMessage(), e );
-
- try {
- sock.trueClose();
- }
- catch ( IOException ioe ) {
- log.error( "++++ failed to close socket : " + sock.toString() );
- }
-
- sock = null;
- }
-
-
- if ( sock != null ) {
- sock.close();
- sock = null;
- }
- return false;
- }
通过set方法向服务器设置key和value,涉及到以下几个点
数据的压缩和序列化 (如果是get方法,则和set方法基本是相反的)
为key分配服务器 对于一些常用类型,采用自定义的序列化,具体要看NativeHander.java,这个类比较简单,有兴趣可以自己看看
- public static boolean isHandled( Object value ) {
- return (
- value instanceof Byte ||
- value instanceof Boolean ||
- value instanceof Integer ||
- value instanceof Long ||
- value instanceof Character ||
- value instanceof String ||
- value instanceof StringBuffer ||
- value instanceof Float ||
- value instanceof Short ||
- value instanceof Double ||
- value instanceof Date ||
- value instanceof StringBuilder ||
- value instanceof byte[]
- )
- ? true
- : false;
- }
其他类型则用java的默认序列化
为key选择服务器
SockIOPool.SockIO sock = pool.getSock( key, hashCode );就是为key选择服务器
- public SockIO getSock(String key, Integer hashCode) {
- if (log.isDebugEnabled()) log.debug("cache socket pick " + key + " " + hashCode);
- if (!this.initialized) {
- log.error("attempting to get SockIO from uninitialized pool!");
- return null;
- }
-
-
- if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 0)
- || (buckets != null && buckets.size() == 0)) return null;
-
-
- if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 1)
- || (buckets != null && buckets.size() == 1)) {
- SockIO sock = (this.hashingAlg == CONSISTENT_HASH) ? getConnection(consistentBuckets.get(consistentBuckets.firstKey())) : getConnection(buckets.get(0));
- if (sock != null && sock.isConnected()) {
- if (aliveCheck) {
-
- if (!sock.isAlive()) {
- sock.close();
- try {
- sock.trueClose();
-
- } catch (IOException ioe) {
- log.error("failed to close dead socket");
- }
- sock = null;
- }
- }
- } else {
- if (sock != null) {
- deadPool.put(sock, ZERO);
- sock = null;
- }
- }
-
- return sock;
- }
-
- Set<String> tryServers = new HashSet<String>(Arrays.asList(servers));
-
- long bucket = getBucket(key, hashCode);
- String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);
-
- while (!tryServers.isEmpty()) {
-
- SockIO sock = getConnection(server);
- if (log.isDebugEnabled()) log.debug("cache choose " + server + " for " + key);
- if (sock != null && sock.isConnected()) {
- if (aliveCheck) {
- if (sock.isAlive()) {
- return sock;
- } else {
- sock.close();
- try {
- sock.trueClose();
- } catch (IOException ioe) {
- log.error("failed to close dead socket");
- }
- sock = null;
- }
- } else {
- return sock;
- }
- } else {
- if (sock != null) {
- deadPool.put(sock, ZERO);
- sock = null;
- }
- }
-
-
- if (!failover) return null;
-
-
- tryServers.remove(server);
-
- if (tryServers.isEmpty()) break;
-
- int rehashTries = 0;
- while (!tryServers.contains(server)) {
- String newKey = String.format("%s%s", rehashTries, key);
- if (log.isDebugEnabled()) log.debug("rehashing with: " + newKey);
-
- bucket = getBucket(newKey, null);
- server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets.get((int) bucket);
- rehashTries++;
- }
- }
- return null;
- }
下面这个方法是真正的从服务器获取连接
- public SockIO getConnection(String host) {
- if (!this.initialized) {
- log.error("attempting to get SockIO from uninitialized pool!");
- return null;
- }
-
- if (host == null) return null;
-
- synchronized (this) {
-
-
- if (availPool != null && !availPool.isEmpty()) {
-
- Map<SockIO, Long> aSockets = availPool.get(host);
- if (aSockets != null && !aSockets.isEmpty()) {
- for (Iterator<SockIO> i = aSockets.keySet().iterator(); i.hasNext();) {
- SockIO socket = i.next();
- if (socket.isConnected()) {
- if (log.isDebugEnabled()) log.debug("++++ moving socket for host (" + host
- + ") to busy pool ... socket: " + socket);
-
- i.remove();
-
- addSocketToPool(busyPool, host, socket);
-
- return socket;
- } else {
-
- deadPool.put(socket, ZERO);
-
- i.remove();
- }
- }
- }
- }
- }
-
-
- SockIO socket = createSocket(host);
- if (socket != null) {
- synchronized (this) {
- addSocketToPool(busyPool, host, socket);
- }
- }
- return socket;
- }
-
-
failover和failback
这两者都是发生在获取可用连接这个环节。
failover,如果为key选择的服务器不可用,则对key重新哈希选择下一个服务器,详见getSock方法的末尾。
failback,用一个hashmap存储连接失败的服务器和对应的失效持续时间,每次获取连接时,都探测是否到了重试时间。
代码中实际运用memcached——java
标签:style blog http io color ar os java sp
原文地址:http://www.cnblogs.com/zhouyunbaosujina/p/4081822.html