标签:
最近准备在项目中引入分布式锁,故而研究基于zookeeper的curator框架。
网上资料不多,自己研究其源码发现,这个框架已经帮我做了很多现成的实现。
下面介绍下锁的实现:
通过源码中LockingExample例子作为切入(推荐多利用现有资源,最快切入),为减小篇幅,代码仅保留关键部分。
curator已经为我们准备好了锁的实现 ----InterProcessMutex,基于zookeeper跨jvm的公平互斥锁实现.
-----------------------------------------------------------------------------------------------------------------------------
1.看下锁的定义,将线程对象和锁对象(线程、路径、锁的数量)关联
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); private static class LockData { final Thread owningThread; final String lockPath; final AtomicInteger lockCount = new AtomicInteger(1); private LockData(Thread owningThread, String lockPath) { this.owningThread = owningThread; this.lockPath = lockPath; } }
2.获得锁:
private final LockInternals internals;
@Override public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
3.可以看到,逻辑是基于LockInternals的,来看一下他是怎么做到的
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {while ( !isDone ) { isDone = true; try { ourPath = driver.createsTheLock(client, path, localLockNodeBytes); hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } } } private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { client.getData().usingWatcher(watcher).forPath(previousSequencePath); } } } } }return haveTheLock; }
4.分析下前文中的driver实现,即StandardLockInternalsDriver 关键代码
@Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); boolean getsTheLock = ourIndex < maxLeases; String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); } @Override public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception { String ourPath; if ( lockNodeBytes != null ) { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); } else { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } return ourPath; }
先发布,后续有心得再续。
标签:
原文地址:http://www.cnblogs.com/it-worker365/p/4439191.html