标签:tac tst zookeeper mic ext apache class storm override
package com.example.demo3.zk; import lombok.extern.slf4j.Slf4j; import org.apache.storm.shade.org.apache.zookeeper.*; import java.util.concurrent.CountDownLatch; /** * Zookeeper 初始化,获取锁,释放锁。创建临时锁。 */ @Slf4j public class ZooKeeperSession { private static CountDownLatch countDownLatch=new CountDownLatch(1); private ZooKeeper zooKeeper; //目录 private String lockPath="/orderId-lock-"; /** * 连接zookeeper */ public ZooKeeperSession(){ try { //连接zk服务器 this.zooKeeper=new ZooKeeper("192.168.132.154:2181,192.168.132.156:2181,192.168.132.155:2181", 50000,new ZooKeeperWatcher()); log.info("状态:"+zooKeeper.getState().toString()); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } log.info("ZooKeeper session 建立......"); } /** * 获取分布式锁。 * @param orderId */ public void acquireDistributeLock(Long orderId) { String path = lockPath + orderId; try { zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); log.info("success to acquire lock for order[id=" + orderId + "]"); } catch (Exception e) { e.printStackTrace(); int count = 0; while (true) { try { Thread.sleep(20); zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e1) { e1.printStackTrace(); count++; continue; } log.info("success to acquire lock for order[id=" + orderId + " after " + count + " times try......"); break; } } } /** * 释放分布式锁。 * @param orderId */ public void releaseDistributeLock(Long orderId){ String path = lockPath+orderId; try { zooKeeper.delete(path,-1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } private class ZooKeeperWatcher implements Watcher{ @Override public void process(WatchedEvent event) { log.info("Receive watch event:"+event.getState()); if(Event.KeeperState.SyncConnected == event.getState()){ countDownLatch.countDown(); } } } /** * 封装单例静态内部类。 */ private static class Singleton{ private static ZooKeeperSession instance; static { instance=new ZooKeeperSession(); } public static ZooKeeperSession getIntance(){ return instance; } } /** * 获取单例。 * @return */ public static ZooKeeperSession getInstance(){ return Singleton.getIntance(); } /** * 初始化单例方法。 */ public static void init(){ getInstance(); } }
调用方法:
package com.example.demo3; import com.example.demo3.zk.ZooKeeperSession; import lombok.extern.slf4j.Slf4j; import org.junit.Test; @Slf4j public class TestZooKeeper extends Demo3ApplicationTests { /** * 测试分布式锁。 */ @Test public void testZookeeper() { Long orderId = 1L; ZooKeeperSession zooKeeperSession = new ZooKeeperSession(); log.info("获取锁"); zooKeeperSession.acquireDistributeLock(orderId); log.info("执行业务逻辑..."); zooKeeperSession.releaseDistributeLock(orderId); log.info("释放锁"); } }
运行结果:
源码下载地址:
链接:https://pan.baidu.com/s/1rgyoxf9lLTjDIWX-Ro5o-Q
提取码:ke31
标签:tac tst zookeeper mic ext apache class storm override
原文地址:https://www.cnblogs.com/xiaozw/p/11926816.html