标签:
/**
* @param client client
* @param barrierPath path to use as the barrier
*/
public DistributedBarrier(CuratorFramework client, String barrierPath)
public class DistributedBarrierExample
{
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
controlBarrier.setBarrier();
for (int i = 0; i < QTY; ++i)
{
final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
final int index = i;
Callable<Void> task = new Callable<Void>()
{
@Override
public Void call() throws Exception
{
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " 等待");
barrier.waitOnBarrier();
System.out.println("Client #" + index + " 开始");
return null;
}
};
service.submit(task);
}
Thread.sleep(1000 * 3);
System.out.println("所有的Client都在等待");
controlBarrier.removeBarrier();
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
client.close();
System.out.println("OK!");
}
}
Client #1 等待
Client #2 等待
Client #0 等待
Client #4 等待
Client #3 等待
所有的Client都在等待
Client #4 开始
Client #2 开始
Client #0 开始
Client #3 开始
Client #1 开始
OK!
// client - the client
// barrierPath - path to use
// memberQty - the number of members in the barrier
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)
public class DistributedBarrierDoubleExample
{
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < (QTY + 2); ++i)
{
final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
final int index = i;
Callable<Void> task = new Callable<Void>()
{
@Override
public Void call() throws Exception
{
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " 等待");
if(false == barrier.enter(5, TimeUnit.SECONDS))
{
System.out.println("Client #" + index + " 等待超时!");
return null;
}
System.out.println("Client #" + index + " 进入");
Thread.sleep((long) (3000 * Math.random()));
barrier.leave();
System.out.println("Client #" + index + " 结束");
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
client.close();
System.out.println("OK!");
}
}
Client #0 等待
Client #2 等待
Client #3 等待
Client #4 等待
Client #1 等待
Client #4 进入
Client #2 进入
Client #0 进入
Client #1 进入
Client #3 进入
Client #4 结束
Client #5 等待
Client #2 结束
Client #3 结束
Client #6 等待
Client #0 结束
Client #1 结束
Client #5 等待超时!
Client #6 等待超时!
OK!
标签:
原文地址:http://www.cnblogs.com/LiZhiW/p/4937547.html