标签:square 计数器 i++ Fix service 微信 指定 over wait方法
转:
本文主要介绍和对比我们常用的几种并发工具类,主要涉及 CountDownLatch
、 CyclicBarrier
、 Semaphore
、 Exchanger
相关的内容,如果对多线程相关内容不熟悉,可以看笔者之前的一些文章:
CountDownLatch
、CyclicBarrier
两者的使用与区别,他们都是等待多线程完成,是一种并发流程的控制手段,Semaphore
、Exchanger
的使用,semaphore
是信号量,可以用来控制允许的线程数,而 Exchanger
可以用来交换两个线程间的数据。CountDownLatch
是 JDK5
之后加入的一种并发流程控制工具,它在 java.util.concurrent
包下CountDownLatch
允许一个或多个线程等待其他线程完成操作,这里需要注意,是可以是一个等待也可以是多个来等待CountDownLatch
的构造函数如下,它接受一个 int
类型的参数作为计数器,即如果你想等待N
个线程完成,那么这里就传入 N
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
countDown
与 await
,其中 当我们调用 countDown
方法时相应的 N
的值减 1,而 await
方法则会阻塞当前线程,直到 N
的值变为零。CountDownLatch
来实现这一案例,那么等待的个数 N
就是上面的裁判线程的个数,即为 1,
/**
* @url i-code.onlien
* 云栖简码
*/
public static void main(String[] args) throws InterruptedException {
//模拟跑步比赛,裁判说开始,所有选手开始跑,我们可以使用countDownlatch来实现
//这里需要等待裁判说开始,所以时等着一个线程
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(() ->{
try {
System.out.println(Thread.currentThread().getName() +"已准备");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"开始跑~~");
},"选手1").start();
new Thread(() ->{
try {
System.out.println(Thread.currentThread().getName() +"已准备");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"开始跑~~");
},"选手2").start();
TimeUnit.SECONDS.sleep(1);
System.out.println("裁判:预备~~~");
countDownLatch.countDown();
System.out.println("裁判:跑~~~");
}
在上述代码中,我们首先创建了一个计数为1 的CountDownLatch
对象,这代表我们需要等待的线程数,之后再创建了两个线程,用来代表选手线程,同时在选手的线程中我们都调用了await
方法,让线程进入阻塞状态,直到CountDownLatch的计数为零后再执行后面的内容,在主线程main
方法中我们等待 1秒后执行countDown
方法,这个方法就是减一,此时的N
则为零了,那么选手线程则开始执行后面的内容,整体的输出如上图所示
CountDownLatch
来实现,那么的计数个数 N
则为5,因为要等待这五个,通过代码实现如下:
public static void main(String[] args) throws InterruptedException {
/**
* i-code.online
* 云栖简码
*/
//等待的个数
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "从住所出发...");
try {
TimeUnit.SECONDS.sleep((long) (Math.random()*10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 到达目的地-----");
countDownLatch.countDown();
},"人员-"+i).start();
}
System.out.println("大巴正在等待人员中.....");
countDownLatch.await();
System.out.println("-----所有人到齐,出发-----");
}
从上述代码中我们可以看到,定义了一个计数为5的countDownLatch
,之后通过循环创建五个线程,模拟五个人员,当他们到达指定地点后执行countDown
方法,对计数减一。主线程相当于是大巴车的线程,执行await
方法进行阻塞,只有当N
的值减到0后则执行后面的输出
public CountDownLatch(int count) { };
它的构造函数是传入一个参数,该参数 count
是需要倒数的数值。
await()
:调用 await()
方法的线程开始等待,直到倒数结束,也就是 count
值为 0
的时候才会继续执行。await(long timeout, TimeUnit unit)
:await()
有一个重载的方法,里面会传入超时参数,这个方法的作用和 await()
类似,但是这里可以设置超时时间,如果超时就不再等待了。countDown()
:把数值倒数 1
,也就是将 count
值减 1
,直到减为 0
时,之前等待的线程会被唤起。上面的案例介绍了CountDownLatch
的使用,但是CountDownLatch
有个特点,那就是不能够重用,比如已经完成了倒数,那可不可以在下一次继续去重新倒数呢?是可以的,一旦倒数到0 则结束了,无法再次设置循环执行,但是我们实际需求中有很多场景中需要循环来处理,这时候我们可以使用CyclicBarrier
来实现
CyclicBarrier
与 CountDownLatch
比较相似,当等待到一定数量的线程后开始执行某个任务CyclicBarrier
的字面意思是可以循环使用的屏障,它的功能就是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开会,此时所有被屏障阻塞的线程都将继续执行。如下演示 public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
CyclicBarrier(int parties)
构造函数提供了int
类型的参数,代表的是需要拦截的线程数量,而每个线程通过调用 await
方法来告诉 CyclicBarrier
我到达屏障点了,然后阻塞CyclicBarrier(int parties, Runnable barrierAction)
构造函数是为我们提供的一个高级方法,加了一个 barrierAction
的参数,这是一个Runnable
类型的,也就是一个线程,它表示当所有线程到达屏障后,悠闲触发 barrierAction
线程执行,再执行各个线程之后的内容CyclicBarrier
的来实现,这里我们需要来拦截的线程就是两个。具体实现 如下: /*
CyclicBarrier 与countDownLatch 比较相似,也是等待线程完成,
不过countDownLatch 是await等待其他的线程通过countDown的数量,达到一定数则执行,
而 CyclicBarrier 则是直接看await的数量,达到一定数量直接全部执行,
*/
public static void main(String[] args) {
//好比情侣约会,不管谁先到都的等另一个,这里就是两个线程,
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
new Thread(() ->{
System.out.println("快速收拾,出门~~~");
try {
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("到了约会地点等待女朋友前来~~");
cyclicBarrier.await();
System.out.println("女朋友到来嗨皮出发~~约会");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},"男朋友").start();
new Thread(() ->{
System.out.println("慢慢收拾,出门~~~");
try {
TimeUnit.MILLISECONDS.sleep(5000);
System.out.println("到了约会地点等待男朋友前来~~");
cyclicBarrier.await();
System.out.println("男朋友到来嗨皮出发~~约会");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
},"女朋友").start();
}
上面代码,相对简单,创建一个拦截数为2的屏障,之后创建两个线程,调用await方法,只有当调用两次才会触发后面的流程。
Runnable
参数的构造函数;和之前 CountDownLatch
的案例相似,公司组织出游,这时候肯定有很多大巴在等待接送,大巴不会等所有的 人都到才出发,而是每坐满一辆车就出发一辆,这种场景我们就可以使用 CyclicBarrier
来实现,实现如下:
/*
CyclicBarrier是可重复使用到,也就是每当几个满足是不再等待执行,
比如公司组织出游,安排了好多辆大把,每坐满一辆就发车,不再等待,类似这种场景,实现如下:
*/
public static void main(String[] args) {
//公司人数
int peopleNum = 2000;
//每二十五个人一辆车,凑够二十五则发车~
CyclicBarrier cyclicBarrier = new CyclicBarrier(25,() ->{
//达到25人出发
System.out.println("------------25人数凑齐出发------------");
});
for (int j = 1; j <= peopleNum; j++) {
new Thread(new PeopleTask("People-"+j,cyclicBarrier)).start();
}
}
static class PeopleTask implements Runnable{
private String name;
private CyclicBarrier cyclicBarrier;
public PeopleTask(String name,CyclicBarrier cyclicBarrier){
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println(name+"从家里出发,正在前往聚合地....");
try {
TimeUnit.MILLISECONDS.sleep(((int) Math.random()*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+"到达集合地点,等待其他人..");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
相同点:
不同点:
CountDownLatch
的计数器只能使用一次,到到达0后就不能再次使用了,除非新建实例;而 CyclicBarrier
的计数器是可以复用循环的,所以 CyclicBarrier
可以用在更复杂的场景,可以随时调用 reset
方法来重制拦截数,如计算发生错误时可以直接充值计数器,让线程重新执行一次。CyclicBarrier
要等固定数量的线程都到达了屏障位置才能继续执行,而 CountDownLatch
只需等待数字倒数到 0
,也就是说 CountDownLatch
作用于事件,但 CyclicBarrier
作用于线程;CountDownLatch
是在调用了 countDown
方法之后把数字倒数减 1
,而 CyclicBarrier
是在某线程开始等待后把计数减 1
。CyclicBarrier
有执行动作 barrierAction
,而 CountDownLatch
没这个功能。Semaphore
(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,acquire
方法)。线程可以从信号量中去“获取”一个许可证,一旦线程获取之后,信号量持有的许可证就转移过去了,所以信号量手中剩余的许可证要减一。release
方法),这个许可证相当于被归还给信号量了,于是信号量中的许可证的可用数量加一。当信号量拥有的许可证数量减到 0 时,如果下个线程还想要获得许可证,那么这个线程就必须等待,直到之前得到许可证的线程释放,它才能获取。由于线程在没有获取到许可证之前不能进一步去访问被保护的共享资源,所以这就控制了资源的并发访问量,这就是整体思路。IO
操作,我们可以启动很多线程但是数据库的连接池是有限制的,假设我们设置允许五个链接,如果我们开启太多线程直接操作则会出现异常,这时候我们可以通过信号量来控制,让一直最多只有五个线程来获取连接。代码如下: /*
Semaphore 是信号量, 可以用来控制线程的并发数,可以协调各个线程,以达到合理的使用公共资源
*/
public static void main(String[] args) {
//创建10个容量的线程池
final ExecutorService service = Executors.newFixedThreadPool(100);
//设置信号量的值5 ,也就是允许五个线程来执行
Semaphore s = new Semaphore(5);
for (int i = 0; i < 100; i++) {
service.submit(() ->{
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("数据库耗时操作"+Thread.currentThread().getName());
TimeUnit.MILLISECONDS.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "正在执行....");
s.release();
});
}
}
如上代码,创建了一个容量100的线程池,模拟我们程序中大量的线程,添加一百个任务,让线程池执行。创建了一个容量为5的信号量,在线程中我们调用acquire
来获得信号量的许可,只有获得了才能只能下面的内容不然阻塞。当执行完后释放该许可,通过release
方法,
private static int count = 0;
/*
Semaphore 中如果我们允许的的许可证数量为1 ,那么它的效果与锁相似。
*/
public static void main(String[] args) throws InterruptedException {
final ExecutorService service = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(1);
for (int i = 0; i < 10000; i++) {
service.submit(() ->{
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行了");
count ++;
semaphore.release();
});
}
service.shutdown();
TimeUnit.SECONDS.sleep(5);
System.out.println(count);
}
public boolean tryAcquire()
:tryAcquire
和锁的 trylock
思维是一致的,是尝试获取许可证,相当于看看现在有没有空闲的许可证,如果有就获取,如果现在获取不到也没关系,不必陷入阻塞,可以去做别的事。public boolean tryAcquire(long timeout, TimeUnit unit)
:是一个重载的方法,它里面传入了超时时间。比如传入了 3 秒钟,则意味着最多等待 3 秒钟,如果等待期间获取到了许可证,则往下继续执行;如果超时时间到,依然获取不到许可证,它就认为获取失败,且返回 false。int availablePermits()
:返回此信号量中当前可用的许可证数int getQueueLength()
:返回正在等待许可证的线程数boolean hasQueuedThreads()
:判断是否有线程正在等待获取许可证void reducePermits(int reduction)
:减少 reduction
个许可证,是个 protected
方法Collection getQueuedThreads()
:返回正在等待获取许可证的线程集合,是个 protected
方法Exchanger
(交换者)是一个用于线程间协作的工具类,它主要用于进行线程间数据的交换,它有一个同步点,当两个线程到达同步点时可以将各自的数据传给对方,如果一个线程先到达同步点则会等待另一个到达同步点,到达同步点后调用 exchange
方法可以传递自己的数据并且获得对方的数据。public class ExchangerTest {
/*
Exchanger 交换, 用于线程间协作的工具类,可以交换线程间的数据,
其提供一个同步点,当线程到达这个同步点后进行数据间的交互,遗传算法可以如此来实现,
以及校对工作也可以如此来实现
*/
public static void main(String[] args) {
/*
模拟 两个工作人员录入记录,为了防止错误,两者录的相同内容,程序仅从校对,看是否有错误不一致的
*/
//开辟两个容量的线程池
final ExecutorService service = Executors.newFixedThreadPool(2);
Exchanger exchanger = new Exchanger<>();
service.submit(() ->{
//模拟数据 线程 A的
InfoMsg infoMsg = new InfoMsg();
infoMsg.content="这是线程A";
infoMsg.id ="10001";
infoMsg.desc = "1";
infoMsg.message = "message";
System.out.println("正在执行其他...");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
final InfoMsg exchange = exchanger.exchange(infoMsg);
System.out.println("线程A 交换数据====== 得到"+ exchange);
if (!exchange.equals(infoMsg)){
System.out.println("数据不一致~~请稽核");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.submit(() ->{
//模拟数据 线程 B的
InfoMsg infoMsg = new InfoMsg();
infoMsg.content="这是线程B";
infoMsg.id ="10001";
infoMsg.desc = "1";
infoMsg.message = "message";
System.out.println("正在执行其他...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
final InfoMsg exchange = exchanger.exchange(infoMsg);
System.out.println("线程B 交换数据====== 得到"+ exchange);
if (!exchange.equals(infoMsg)){
System.out.println("数据不一致~~请稽核");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.shutdown();
}
static class InfoMsg{
String id;
String name;
String message;
String content;
String desc;
@Override
public String toString() {
return "InfoMsg{" +
"id=‘" + id + ‘‘‘ +
", name=‘" + name + ‘‘‘ +
", message=‘" + message + ‘‘‘ +
", content=‘" + content + ‘‘‘ +
", desc=‘" + desc + ‘‘‘ +
‘}‘;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InfoMsg infoMsg = (InfoMsg) o;
return Objects.equals(id, infoMsg.id) &&
Objects.equals(name, infoMsg.name) &&
Objects.equals(message, infoMsg.message) &&
Objects.equals(content, infoMsg.content) &&
Objects.equals(desc, infoMsg.desc);
}
@Override
public int hashCode() {
return Objects.hash(id, name, message, content, desc);
}
}
}
上面代码运行可以看到,当我们线程A/B
到达同步点即调用exchange
后进行数据的交换,拿到对方的数据再与自己的数据对比可以做到稽核 的效果
Exchanger
同样可以用于遗传算法中,选出两个对象进行交互两个的数据通过交叉规则得到两个混淆的结果。Exchanger
中嗨提供了一个方法 public V exchange(V x, long timeout, TimeUnit unit)
主要是用来防止两个程序中一个一直没有执行 exchange
而导致另一个一直陷入等待状态,这是可以用这个方法,设置超时时间,超过这个时间则不再等待。本文由AnonyStar 发布,可转载但需声明原文出处。
欢迎关注微信公账号 :云栖简码 获取更多优质文章
更多文章关注笔者博客 :云栖简码 i-code.online
转:
CountDownLatch、CyclicBarrier、Semaphore、Exchanger 的详细解析
标签:square 计数器 i++ Fix service 微信 指定 over wait方法
原文地址:https://www.cnblogs.com/wangtcc/p/14477558.html