标签:
// 构造方法
public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath, String id)
public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode)
// 查看当前LeaderLatch实例是否是leader
public boolean hasLeadership()
// 尝试让当前LeaderLatch实例称为leader
public void await() throws InterruptedException, EOFException
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
public class LeaderLatchExample
{
private static final int CLIENT_QTY = 10;
private static final String PATH = "/examples/leader";
public static void main(String[] args) throws Exception
{
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderLatch> examples = Lists.newArrayList();
try
{
for (int i = 0; i < CLIENT_QTY; ++i)
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
clients.add(client);
client.start();
LeaderLatch example = new LeaderLatch(client, PATH, "Client #" + i);
examples.add(example);
example.start();
}
System.out.println("LeaderLatch初始化完成!");
Thread.sleep(10 * 1000);// 等待Leader选举完成
LeaderLatch currentLeader = null;
for (int i = 0; i < CLIENT_QTY; ++i)
{
LeaderLatch example = examples.get(i);
if (example.hasLeadership())
{
currentLeader = example;
}
}
System.out.println("当前leader:" + currentLeader.getId());
currentLeader.close();
examples.get(0).await(10, TimeUnit.SECONDS);
System.out.println("当前leader:" + examples.get(0).getLeader());
System.out.println("输入回车退出");
new BufferedReader(new InputStreamReader(System.in)).readLine();
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
for (LeaderLatch exampleClient : examples)
{
System.out.println("当前leader:" + exampleClient.getLeader());
try
{
CloseableUtils.closeQuietly(exampleClient);
}
catch (Exception e)
{
System.out.println(exampleClient.getId() + " -- " + e.getMessage());
}
}
for (CuratorFramework client : clients)
{
CloseableUtils.closeQuietly(client);
}
}
System.out.println("OK!");
}
}
LeaderLatch初始化完成!
当前leader:Client #1
当前leader:Participant{id=‘Client #8‘, isLeader=true}
输入回车退出
当前leader:Participant{id=‘Client #8‘, isLeader=true}
当前leader:Participant{id=‘Client #8‘, isLeader=true}
Client #1 -- Already closed or has not been started
当前leader:Participant{id=‘Client #8‘, isLeader=true}
当前leader:Participant{id=‘Client #8‘, isLeader=true}
当前leader:Participant{id=‘Client #8‘, isLeader=true}
当前leader:Participant{id=‘Client #8‘, isLeader=true}
当前leader:Participant{id=‘Client #8‘, isLeader=true}
当前leader:Participant{id=‘Client #8‘, isLeader=true}
当前leader:Participant{id=‘Client #8‘, isLeader=true}
当前leader:Participant{id=‘Client #9‘, isLeader=true}
OK!
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener)
public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable
{
private final String name;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
public ExampleClient(CuratorFramework client, String path, String name)
{
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
leaderSelector.autoRequeue();
}
public void start() throws IOException
{
leaderSelector.start();
}
@Override
public void close() throws IOException
{
leaderSelector.close();
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
final int waitSeconds = 1;
System.out.println(name + " 是当前的leader(" + leaderSelector.hasLeadership() + ") 等待" + waitSeconds + "秒...");
System.out.println(name + " 之前成为leader的次数:" + leaderCount.getAndIncrement() + "次");
try
{
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
}
catch (InterruptedException e)
{
System.err.println(name + " 已被中断");
Thread.currentThread().interrupt();
}
finally
{
System.out.println(name + " 放弃leader\n");
}
}
}
public class LeaderSelectorExample
{
private static final int CLIENT_QTY = 10;
private static final String PATH = "/examples/leader";
public static void main(String[] args) throws Exception
{
List<CuratorFramework> clients = Lists.newArrayList();
List<ExampleClient> examples = Lists.newArrayList();
try
{
for (int i = 0; i < CLIENT_QTY; ++i)
{
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
clients.add(client);
client.start();
ExampleClient example = new ExampleClient(client, PATH, "Client #" + i);
examples.add(example);
example.start();
}
System.out.println("输入回车退出:");
new BufferedReader(new InputStreamReader(System.in)).readLine();
}
finally
{
for (ExampleClient exampleClient : examples)
{
CloseableUtils.closeQuietly(exampleClient);
}
for (CuratorFramework client : clients)
{
CloseableUtils.closeQuietly(client);
}
}
System.out.println("OK!");
}
}
输入回车退出:
Client #4 是当前的leader(true) 等待1秒...
Client #4 之前成为leader的次数:0次
Client #4 放弃leader
Client #5 是当前的leader(true) 等待1秒...
Client #5 之前成为leader的次数:0次
Client #5 已被中断
Client #5 放弃leader
OK!
LeaderSelectorListener
可以对领导权进行控制,在适当的时候释放领导权,这样每个节点都有可能获得领导权。而LeaderLatch一根筋到死,除非调用close方法,否则它不会释放领导权。标签:
原文地址:http://www.cnblogs.com/LiZhiW/p/4930486.html