码迷,mamicode.com
首页 > 其他好文 > 详细

curator 实现分布式一致性锁

时间:2015-04-19 16:03:19      阅读:226      评论:0      收藏:0      [点我收藏+]

标签:

最近准备在项目中引入分布式锁,故而研究基于zookeeper的curator框架。

网上资料不多,自己研究其源码发现,这个框架已经帮我做了很多现成的实现。

下面介绍下锁的实现:

通过源码中LockingExample例子作为切入(推荐多利用现有资源,最快切入),为减小篇幅,代码仅保留关键部分。

curator已经为我们准备好了锁的实现 ----InterProcessMutex,基于zookeeper跨jvm的公平互斥锁实现.

-----------------------------------------------------------------------------------------------------------------------------

1.看下锁的定义,将线程对象和锁对象(线程、路径、锁的数量)关联

    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    private static class LockData
    {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

2.获得锁:

  • 如果线程已经有锁,则增加锁的数量,返回  String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  • 否则尝试获取锁,获得则加入线程持有锁的MAP,否则返回未获得锁。
    private final LockInternals internals;    

@Override
public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }

3.可以看到,逻辑是基于LockInternals的,来看一下他是怎么做到的

  • 通过dirver在锁目录下创建EPHEMERAL_SEQUENTIAL节点
  • 循环尝试获取
  1. 获取锁目录下子节点的有序集合
  2. 通过dirver尝试得到PredicateResults(含有是否得到锁及需要监视的目录)
  3. 更新监听信息,开始下一轮尝试
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {while ( !isDone )
        {
            isDone = true;

            try
            {
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
        }
    }

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        }
                    }
                }
            }
        }return haveTheLock;
    }

 4.分析下前文中的driver实现,即StandardLockInternalsDriver 关键代码

  • 创建锁是基于目录下创建的EPHEMERAL_SEQUENTIAL节点,即与客户端生命周期相同,并且名字后自动加创建的序列号
  • 得到PredicateResult,如果当前节点为最小节点,则得到锁,getsTheLock为true,否则得到该序列的前一个节点,设为pathToWatch,监控之
    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {
        int             ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);

        boolean         getsTheLock = ourIndex < maxLeases;
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

        return new PredicateResults(pathToWatch, getsTheLock);
    }

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

 

先发布,后续有心得再续。

 

curator 实现分布式一致性锁

标签:

原文地址:http://www.cnblogs.com/it-worker365/p/4439191.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!