标签:
public class SharedCounterExample implements SharedCountListener
{
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception
{
final Random rand = new Random();
SharedCounterExample example = new SharedCounterExample();
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
SharedCount baseCount = new SharedCount(client, PATH, 0);
baseCount.addListener(example);
baseCount.start();
List<SharedCount> examples = Lists.newArrayList();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i)
{
final SharedCount count = new SharedCount(client, PATH, 0);
examples.add(count);
Callable<Void> task = new Callable<Void>()
{
@Override
public Void call() throws Exception
{
count.start();
Thread.sleep(rand.nextInt(10000));
count.setCount(rand.nextInt(10000));
System.out.println("计数器当前值:" + count.getVersionedValue().getValue());
System.out.println("计数器当前版本:" + count.getVersionedValue().getVersion());
System.out.println("trySetCount:" + count.trySetCount(count.getVersionedValue(), 123));
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
for (int i = 0; i < QTY; ++i)
{
examples.get(i).close();
}
baseCount.close();
client.close();
System.out.println("OK!");
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
System.out.println("连接状态: " + newState.toString());
}
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
{
System.out.println("计数器值改变:" + newCount);
}
}
连接状态: CONNECTED
计数器当前值:1684
计数器当前版本:11
trySetCount:true
计数器值改变:123
计数器当前值:8425
计数器当前版本:13
trySetCount:true
计数器值改变:123
计数器当前值:9369
计数器当前版本:15
trySetCount:true
计数器值改变:123
计数器当前值:4075
计数器当前版本:17
trySetCount:true
计数器值改变:123
计数器当前值:9221
计数器当前版本:19
trySetCount:true
OK!
public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
{
private final DistributedAtomicValue value;
......
}
public class DistributedAtomicValue
{
......
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
tryWithMutex(result, makeValue);
}
return result;
}
......
}
public class DistributedAtomicLongExample
{
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception
{
final Random rand = new Random();
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
List<DistributedAtomicLong> examples = Lists.newArrayList();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i)
{
final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));
examples.add(count);
Callable<Void> task = new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try
{
Thread.sleep(1000 + rand.nextInt(10000));
AtomicValue<Long> value = count.increment();
System.out.println("修改成功: " + value.succeeded());
if (value.succeeded())
{
System.out.println("修改之前的值:" + value.preValue() + " | 修改之后的值:" + value.postValue());
}
}
catch (Exception e)
{
e.printStackTrace();
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
client.close();
System.out.println("OK!");
}
}
修改成功: true
修改之前的值:0 | 修改之后的值:1
修改成功: true
修改之前的值:1 | 修改之后的值:2
修改成功: true
修改之前的值:2 | 修改之后的值:3
修改成功: true
修改之前的值:3 | 修改之后的值:4
修改成功: true
修改之前的值:4 | 修改之后的值:5
OK!
标签:
原文地址:http://www.cnblogs.com/LiZhiW/p/4941771.html