1 定义
虚假唤醒,即spurious wakeups。wait需要在while循环内使用,原因就是因为存在虚假唤醒。
2 Monitor
还是放上这个神图来复习下线程间通信
- 线程在竞争锁失败的情况下会放到Entry Set中,图中2表示线程可以获取锁
- 获取到锁的线程可以调用wait方法,让线程阻塞,此时线程被放到了Wait Set中,如图中3所示;Wait Set中的线程在时间到或者被notify后可以竞争锁,如图中4所示
- Wait Set中的线程在获取到锁后才可以继续执行。
- notify会唤醒Wait Set中的一个线程来竞争锁,notifyAll会唤醒Wait Set中全部的线程,但是只有一个线程能获取到锁,每个线程会依次去获取锁,运行一次。
3 虚假唤醒
我们以生产者消费者问题来举例几种情况,假设在wait在if中而不是在while中
1)情况1 稍微复杂点
- 有一个生产者p,两个消费者c1、c2,一个队列queue
- c1先执行,由于queue中为0,所以c1调用wait线程阻塞,线程放到了Wait Set中
- p生产了一个消费,放到queue中
- 在p调用notify之前,c2开始执行,需要竞争queue的锁,所以c2在Entry Set等待竞争锁
- p生产完成,调用notify,c1收到被唤醒后,从Wait Set竞争锁,注意此时c2也在竞争锁。
- c2从Entry Set先竞争到锁,然后消费了queue中的消息,此时queue大小为0
- c2执行完后,释放锁,此时c1竞争到锁,从queue中消费消息,由于queue目前大小为0,所以从queue为0的队列中访问是非法的。
2)情况2 稍微简单点
- 有一个生产者p,两个消费者c1、c2,一个队列queue
- c1、c2先启动,由于queue是空,所以分别调用wait,c1、c2都进入Wait Set
- 之后p生产了一个消息到queue中,然后调用notifyall,c1和c2都被唤醒。
- c1竞争到锁,消费一个消息,queue大小为0,完成后释放锁
- c2竞争到锁,消费一条消息,queue大小是-1,抛出异常。
3)情况3
- 假设有两种情况会引起消费者阻塞
- c1是由于条件1,调用了wait;c2是由于条件2调用了wait;
- p生产了一条消息,然后满足了条件1,调用notify,却唤醒了c2
- 由于是使用的if,c2没有再判断是不是条件2被满足了,所以就直接获取到锁,造成错误
4)情况4
- wait可能被interrupt等不被唤醒就继续执行
4 举例
我们以情况2来举例
public class ProducerConsumer {
public static void main(String[] args) throws Exception{
new ProducerConsumer().test();
}
public void test() throws InterruptedException {
Queue<Integer> queue = new LinkedList<>();
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue, 0);
Consumer consumer1 = new Consumer(queue, 1);
ExecutorService executor = Executors.newFixedThreadPool(6);
executor.submit(consumer);
executor.submit(consumer1);
Thread.sleep(5000);
executor.submit(producer);
}
class Producer implements Runnable {
private Queue<Integer> queue;
public Producer(Queue<Integer> queue) {
this.queue = queue;
}
@Override
public void run(){
try {
synchronized (queue) {
Integer time = new Random().nextInt(100);
queue.add(time);
System.out.println("producer notifyall");
queue.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private Queue<Integer> queue;
private int index;
public Consumer(Queue<Integer> queue, int i) {
this.queue = queue;
this.index = i;
}
@Override
public void run() {
try {
Thread.currentThread().setName("consumerThread_" + index);
while (true) {
synchronized (queue) {
if (queue.size() <= 0) {
try {
System.out.println("consumer wait:" + Thread.currentThread().getName());
queue.wait();
System.out.println("consumer wait2:" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("gogogo:" + Thread.currentThread().getName());
Integer time = new Random().nextInt(100);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumer remove: " + queue.remove() + ": " + Thread.currentThread().getName());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
运行结果
consumer wait:consumerThread_0
consumer wait:consumerThread_1
producer notifyall
consumer wait2:consumerThread_1
gogogo:consumerThread_1
consumer remove: 99: consumerThread_1
consumer wait:consumerThread_1
consumer wait2:consumerThread_0
gogogo:consumerThread_0
java.util.NoSuchElementException
at java.util.LinkedList.removeFirst(LinkedList.java:270)
at java.util.LinkedList.remove(LinkedList.java:685)
at com.hxy.ProducerConsumer$Consumer.run(ProducerConsumer.java:85)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
上述代码中,两个消费者先启动,由于queue.size 小于等于0 ,所以两个消费者都调用wait,放到了Wait Set中;5s后生产者往队列里发送了一条消息,然后调用notifyAll,两个消费者都被唤醒,由于没有再判断是否满足条件,所以分别获取锁去消费,造成第二个消费者抛出NoSuchElementException异常。
将上述代码中消费者的if判断修改为while便会正常,运行结果如下:
consumer wait:consumerThread_0
consumer wait:consumerThread_1
producer notifyall
consumer wait2:consumerThread_1
gogogo:consumerThread_1
consumer remove: 53: consumerThread_1
consumer wait:consumerThread_1
consumer wait2:consumerThread_0
consumer wait:consumerThread_0
consumerThread_0竞争到锁后,会从while判断queue.size大小,由于queue大小为0,所以继续wait。
另外,可能会想到,在while使用wait,对于情况3这种情况,如果调用notifyall,会唤醒其他不相关的线程,而这些线程需要重新判断,然后再调用wait,这显然是一种资源浪费。针对这种情况,我们可以使用Condition,只唤醒相关的线程。
5 生产者消费者
下面附上正确的消费者生产者
public class ProducerConsumer {
public static void main(String[] args) throws Exception{
new ProducerConsumer().test();
}
public void test() throws InterruptedException {
Queue<Integer> queue = new LinkedList<>();
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue, 0);
Consumer consumer1 = new Consumer(queue, 1);
ExecutorService executor = Executors.newFixedThreadPool(6);
executor.submit(consumer);
executor.submit(consumer1);
executor.submit(producer);
}
class Producer implements Runnable {
private Queue<Integer> queue;
public Producer(Queue<Integer> queue) {
this.queue = queue;
}
@Override
public void run(){
while (true) {
try {
synchronized (queue) {
while (queue.size() >= 10) { // 防止虚假唤醒
queue.wait();
}
Integer time = new Random().nextInt(100);
Thread.sleep(time);
System.out.println("producer add:" + time);
queue.add(time);
queue.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private Queue<Integer> queue;
private int index;
public Consumer(Queue<Integer> queue, int i) {
this.queue = queue;
this.index = i;
}
@Override
public void run() {
try {
Thread.currentThread().setName("consumerThread_" + index);
while (true) {
synchronized (queue) {
while (queue.size() <= 0) { // 防止虚假唤醒
queue.wait();
}
Integer time = new Random().nextInt(100);
Thread.sleep(time);
System.out.println("consumer remove: " + queue.remove() + ": " + Thread.currentThread().getName());
queue.notifyAll(); // 也有可能唤醒另一个consumer中的queue.wait
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}