码迷,mamicode.com
首页 > 其他好文 > 详细

zookeeper分布式锁

时间:2015-11-09 00:19:48      阅读:254      评论:0      收藏:0      [点我收藏+]

标签:

1、pom.xml中添加zookeeper依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

2、DistributedLock.java

package com.zk.dlm;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * Created by Administrator on 2015/11/8.
 */
public class DistributedLock implements Lock, Watcher {
    private ZooKeeper zk = null;

    private String root = "/locks";//根
    private String lockName;//竞争资源的标志
    private String waitNode;//等待前一个锁
    private String myZnode;//当前锁

    private CountDownLatch latch;//计数器

    private int sessionTimeout = 5000;

    private boolean isGetLock = false;

    static volatile AtomicInteger count = new AtomicInteger(0);

    private DistributedLock(){

    }

    public static DistributedLock instanceLock(String lockName){
        return new DistributedLock(lockName);
    }

    /**
     * 创建分布式锁,使用前请确认config配置的zookeeper服务可用
     * @param config 127.0.0.1:2181
     * @param lockName 竞争资源标志,lockName中不能包含单词lock
     */
    private DistributedLock(String lockName){
        this.lockName = lockName;
        // 创建一个与服务器的连接
        try {
            zk = initZk();
            Stat stat = zk.exists(root, false);
            if(stat == null){
                // 创建根节点
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }


    /**
     * zookeeper节点的监视器
     */
    public void process(WatchedEvent event) {
        if(this.latch != null) {
            this.latch.countDown();
        }
    }

    public void lock() {
        try {
            if(this.tryLock()){
                //System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);//等待锁
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr)){
                throw new LockException("lockName can not contains \\u000B");
            }
            //创建临时子节点
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            //System.out.println(myZnode + " is created ");

            //取出所有子节点
            List<String> subNodes = zk.getChildren(root, false);
            //取出所有lockName的锁
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);
            //System.out.println(myZnode + "==" + lockObjNodes.get(0));

            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //如果是最小的节点,则表示取得锁
                return true;
            }
            //如果不是最小的节点,找到比自己小1的节点
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }

    @SuppressWarnings("finally")
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            throw new LockException(e);
        }finally{
            return false;
        }
    }

    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);
        //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
        if(stat != null){
            //System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            isGetLock = this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }

    public void unlock() {
        try {
            //System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            //zk.close();
        } catch (InterruptedException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        }
    }

    public synchronized ZooKeeper initZk() {
        try {
            if(zk==null){
                zk = new ZooKeeper("127.0.0.1:2181", sessionTimeout,this);
            }

        } catch (IOException e) {
            throw new LockException("zk init connect fail" + e.getMessage());
            //System.err.println("zk init connect fail" + e.getMessage());
        }
        return zk;
    }

    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    public Condition newCondition() {
        return null;
    }


    public boolean isGetLock() {
        return isGetLock;
    }

    class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }

    public static void main(String[] args) throws Exception {
        final long starttime = System.currentTimeMillis();

        for(int i=0;i<30;i++){
            new Thread(new Runnable() {
                public void run() {
                    DistributedLock lock = DistributedLock.instanceLock("mylock");;
                    while(true){
                        try {
                            lock.lock();

                            count.incrementAndGet();
                            System.err.println(System.currentTimeMillis()+"|"+Thread.currentThread().getId() + " | lock value: " + count.get());

                        } catch (Exception e) {
                            e.printStackTrace();
                        }finally{
                            lock.unlock();
                            long endtime = System.currentTimeMillis();
                            System.err.println(count.get()/((endtime-starttime)/1000)+"/s");
                        }

                    }

                }
            }).start();

        }

        //Thread.sleep(10000);
    }
}

参考

caurtor 实现的zk分布式锁

redisson 实现的redis分布式锁

如果规模很大推荐caurtor  如果不是特别大用redisson 就可以

http://www.jiacheo.org/blog/620 

http://www.jiacheo.org/blog/122 

http://blog.csdn.net/zhu_tianwei/article/details/44927331 

https://github.com/mrniko/redisson 

http://www.pandablog.cn/41.html 

http://songwie.com/ 


http://blog.csdn.net/zhu_tianwei/article/details/44927331   jedis实现

http://newliferen.github.io/2015/07/27/ZooKeeper%E5%BA%94%E7%94%A8%E4%B9%8B%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81/ 

https://github.com/sfines/menagerie 

https://github.com/xing4git/blog/blob/master/zookeeper/ZooKeeper%E7%A4%BA%E4%BE%8B%20%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81.md 

http://www.111cn.net/jsp/Java/95461.htm 

http://itfish.net/article/23060.html 

http://www.qkeye.com/blog-37-456727.html 


zookeeper分布式锁

标签:

原文地址:http://my.oschina.net/u/1757031/blog/527811

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!