标签:类型 启动 file dem 实现 ble error .com star
Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。
下面放上一个简陋的图以便理解。
要解决的问题如下:
public class ZookeeperClient { public static ZooKeeper zooKeeper; public static final String IP_ADDRESS="xxxx:2181"; public static void init() throws Exception{ zooKeeper = new ZooKeeper(IP_ADDRESS, 15000, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState()== Event.KeeperState.SyncConnected) { } } }); } public static String createTempNode(String path,String data) { try { String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("node "+ path +" with data is created,return node "+node); return node; } catch (KeeperException e) { e.printStackTrace(); return "ERROR"; } catch (InterruptedException e) { e.printStackTrace(); return "ERROR"; } } public static boolean delete(String path,int version) { try { zooKeeper.delete(path,version); System.out.println("delete path:"+ path + "success"); return true; } catch (InterruptedException e) { e.printStackTrace(); return false; } catch (KeeperException e) { e.printStackTrace(); return false; } } public static boolean createPersistentNode(String path,String data) { try { String node = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("node "+ path +" with data is created"); return true; } catch (KeeperException e) { e.printStackTrace(); return false; } catch (InterruptedException e) { e.printStackTrace(); return false; } } public static boolean checkExists(String path){ try { Stat stat = zooKeeper.exists(path,true); if(stat!=null) { return true; } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public static int getChildrens(String path) { try { return zooKeeper.getChildren(path,true).size(); } catch (KeeperException e) { e.printStackTrace(); return -1; } catch (InterruptedException e) { e.printStackTrace(); return -1; } } }
Barrier类,负责实现屏障的功能
public class Barrier { private int size; private String rootPath; public Barrier(int size,String rootPath){ this.rootPath = rootPath; this.size = size; } public void init() throws Exception { ZookeeperClient.init(); if(!ZookeeperClient.checkExists(rootPath)){ ZookeeperClient.createPersistentNode(rootPath,"1"); } } public boolean enter(String name,String number){ ZookeeperClient.createTempNode(rootPath+"/"+name, number); //如果节点下children的数量没有达到所有线程的总数,则继续轮询。 //此时要等待所有的线程都在根节点下创建了节点,才开始执行 while(true) { int size=ZookeeperClient.getChildrens(rootPath); if (size != this.size) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { return true; } } } public boolean exit(String name){ //先删除自己的节点 ZookeeperClient.delete(rootPath+"/"+name,0); //如果节点下children数量大于0,则继续轮询 //此时要等待所有的线程都删除了节点,即所有线程都做完了该做的事情,才结束线程。确保所有的线程同时结束。 while(true){ int size = ZookeeperClient.getChildrens(rootPath); if(size!=0) { System.out.println("The current children node under "+rootPath+" is " + size+", still need waiting"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { return true; } } } }
BarrierDemo类,负责启动线程,为了模拟等待,我为3个线程设置了不同的休眠时间。
预想的结果是t3首先删除节点,此时子节点剩下两个(t1和t2),t3不会结束,而是继续轮询。
t2随后删除节点,此时子节点剩下1个(t1),t2和t3继续轮询。
t3最后删除节点,此时没有子节点,t1,t2,t3全部结束。
public class BarrierDemo { public static void main(String[] args) throws Exception{ Barrier barrier = new Barrier(3,"/barrier"); barrier.init(); Worker worker1 = new Worker(barrier,10000); Worker worker2 = new Worker(barrier,5000); Worker worker3 = new Worker(barrier,2000); Thread t1 = new Thread(worker1,"t1"); Thread t2 = new Thread(worker2,"t2"); Thread t3 = new Thread(worker3,"t3"); t1.start(); t2.start(); t3.start(); } } class Worker implements Runnable { Barrier barrier; long time; Worker(Barrier barrier, long time){ this.barrier = barrier; this.time = time; } public void run() { boolean isEnter=barrier.enter(Thread.currentThread().getName(),"0"); if(isEnter) { System.out.println(Thread.currentThread().getName()+"is working on something important now"); try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } boolean isExit=barrier.exit(Thread.currentThread().getName()); if (isExit) { System.out.println(Thread.currentThread().getName()+"is exiting.."); } } }
运行结果如下
标签:类型 启动 file dem 实现 ble error .com star
原文地址:http://www.cnblogs.com/qingfei1994/p/7670326.html