//index对象 public class DisasterIndexDO implements Serializable { private static final long serialVersionUID = -8688243351154917184L; public int namespace; public int beginIndex; public int currentIndex; public long expireLockTime; public static final long DEFAULT_EXPIRE_TIME = 50; // 当序列被锁时间超时,防止死锁 public DisasterIndexDO(int namespace, int bIndex, int cIndex, long expireLockTime) { this.namespace = namespace; this.beginIndex = bIndex; this.currentIndex = cIndex; this.expireLockTime = expireLockTime; } }
//容灾实现类 public class DisasterCacheHandler { RemoteCache remoteCache; private final int DS_CACHE_NAMESPACE = 67; private final int DS_WRITE_REPETECOUNT = 3; private final int DISASTER_INDEX = 250; private final String DISASTER_KEYS = "disaster_keys"; //容灾读 public Object dsGetRemoteData(int namespace, String key){ return remoteCache.get(DS_CACHE_NAMESPACE, namespace+key); } //本地同步的namespace private Map<Integer, Object> synNamespace = new HashMap<Integer, Object>(); //容灾写 protected boolean dsWriteRemoteData(int namespace, String key, Serializable value) { //先把数据写入缓存 remoteCache.put(this.DS_CACHE_NAMESPACE, namespace+key, value); //本地同步的NameSpace if (!this.synNamespace.containsKey(Integer.valueOf(namespace))) { this.synNamespace.put(Integer.valueOf(namespace), namespace); } // update the Namespace Index and namespace disaster key queue synchronized (this.synNamespace.get(namespace)){ int count = this.DS_WRITE_REPETECOUNT; DisasterIndexDO index = null; do { count--; // try to lock Namespace Index int rc = remoteCache.lock(this.DISASTER_INDEX, String.valueOf(namespace)); // Namespace Index not exist if ( rc == 2 ) { // Initialize Namespace Index index = new DisasterIndexDO(namespace, 1, 1, System.currentTimeMillis()); remoteCache.put(this.DISASTER_INDEX, String.valueOf(namespace), index); // for each namespace should handle disaster, keep the only index object remoteCache.put(namespace, this.DISASTER_KEYS + index.currentIndex, key); return true; } else if (rc == 0) { // lock failure index = (DisasterIndexDO)remoteCache. get(this.DISASTER_INDEX, String.valueOf(namespace)); // 如果鎖已經超時,則解开,避免在访问缓存时死住的情况 if (System.currentTimeMillis() - index.expireLockTime > DisasterIndexDO.DEFAULT_EXPIRE_TIME) { remoteCache.unLock(this.DISASTER_INDEX, String.valueOf(namespace)); continue; } continue; } else if (rc == 1) { // lock success try { index = (DisasterIndexDO)remoteCache.get(this.DISASTER_INDEX, String.valueOf(namespace)); // update locked Namespace Index int curIdx = index.currentIndex + 1; remoteCache.delete(this.DISASTER_INDEX, String.valueOf(namespace)); remoteCache.put(this.DISASTER_INDEX, String.valueOf(namespace), new DisasterIndexDO(namespace, index.beginIndex, curIdx, System.currentTimeMillis())); // keep key of this Namespace with current index remoteCache.put(namespace, this.DISASTER_KEYS + curIdx, key); return true; } catch (Throwable e ) { } finally { // unlock and handle unlock failure remoteCache.unLock(this.DISASTER_INDEX, String.valueOf(namespace)); } } } while (count >= 0); if (count <= 0) { return false; } } } }
原文地址:http://blog.csdn.net/troy__/article/details/39698731