码迷,mamicode.com
首页 > 其他好文 > 详细

zookeeper 分布式锁

时间:2018-06-05 17:06:57      阅读:164      评论:0      收藏:0      [点我收藏+]

标签:task   state   mode   分布式锁   说明   param   java   rgs   time   

package org.windwant.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;

import java.io.IOException;
import java.util.concurrent.*;

/**
 * zookeeper 分布式锁
 */
public class SynZookeeperLock {
    private static final int SESSION_TIMEOUT = 30000;

    public static ZooKeeper getInstance(String domain, Watcher w){
        try {
            CountDownLatch c = new CountDownLatch(1);
            ZooKeeper zk = new ZooKeeper(domain, SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                        c.countDown(); // 唤醒当前正在执行的线程
                    }
                }
            });
            //阻塞直到连接完成
            c.await();
            return zk;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    /**
     * 获取分布式锁
     * 使用临时节点,避免进程获取锁后,down掉未释放锁问题
     * @param domain
     * @param path
     * @param data
     * @param c
     */
    public static void tryLock(String domain, String path, byte[] data, CountDownLatch c){
        //每次获取锁使用新的会话连接
        ZooKeeper zk = getInstance(domain, null);
        zk.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path1, ctx, name) -> {
            //节点创建成功,获取锁
            if (rc == 0) {
                System.out.println(Thread.currentThread().getName() + ":result " + rc + " lock " + path + ", created!");
                try {
                    //模拟获取锁后3秒释放
                    Thread.sleep(3000);
                    System.out.println(Thread.currentThread().getName() + ":task complete,try release lock!");
                    zk.delete(path, -1, (rc1, path2, ctx1) -> {
                        if(rc1 == 0){
                            System.out.println(Thread.currentThread().getName() + ":lock released!");
                        }
                    }, null);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //释放等待
                c.countDown();
            } else if(rc == -110) {//节点已存在,则说明锁已被其它进程获取,则创建watch,并阻塞等待
                System.out.println(Thread.currentThread().getName() + ":result " + rc + " lock " + path + " already created, waiting!");
                try {
                    zk.getChildren(path, event -> {
                        //watch 到锁删除事件,则触发重新获取锁
                        if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
                            System.out.println(Thread.currentThread().getName() + ":get node deleted event! try lock!");
                            //释放连接,避免服务器因为连接数限制
                            try {
                                zk.close();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            SynZookeeperLock.tryLock(domain, path, data, c);
                            c.countDown();
                        }
                    });
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else {
                //-4 -112
                System.out.println(Thread.currentThread().getName() + ": connection lose or session invalid");
                c.countDown();
//                tryLock(domain, path, data, c);
            }
        }, new Object());
        try {
            //阻塞等待结果
            c.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        String lockPath = "/testlock";
        byte[] lock = "lock".getBytes();
        String domain = "127.0.0.1:2181";
        //测试获取锁线程 注意服务器最大连接数限制
        for (int i = 0; i < 20; i++) {
            Thread tmp = new Thread( () -> tryLock(domain, lockPath, lock, new CountDownLatch(1)));
            tmp.start();
        }
    }
}

 项目地址:https://github.com/windwant/windwant-demo/tree/master/zookeeper-demo

zookeeper 分布式锁

标签:task   state   mode   分布式锁   说明   param   java   rgs   time   

原文地址:https://www.cnblogs.com/niejunlei/p/9140461.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!