标签:https ems locking 插入 dad shu extends try str
1.Java 通过阻塞队列实现生产者消费者模式
提供的方法:
插入元素:
移除元素:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
关于 通过 wait 和 notify 实现生产者消费者模式,可以参考 链接。
关于 通过 Lock 和 竞争条件 Condition 实现生产者消费者模式,可以参考 链接。
利用阻塞队列实现生产者消费者模式,代码如下:
public class BlockingQueue_Test {
private static final int MAX_CAPACITY = 10;
private static ArrayBlockingQueue<Object> goods = new ArrayBlockingQueue<Object>(MAX_CAPACITY);
public static void main(String[] args) {
(new ProducerThread()).start();
(new ConsumerThread()).start();
}
static class ProducerThread extends Thread {
public void run() {
while (true) {
// 每隔 1000 毫秒生产一个商品
try {
Thread.sleep(1000);
goods.put(new Object());
System.out.println("Produce goods, total: " + goods.size());
} catch (InterruptedException e) {
}
}
}
}
static class ConsumerThread extends Thread {
public void run() {
while (true) {
// 每隔 500 毫秒消费一个商品
try {
Thread.sleep(500);
goods.take();
System.out.println("Consume goods, total: " + goods.size());
} catch (InterruptedException e) {
}
}
}
}
}
以 ArrayBlockingQueue
为例,实际上使用了 ReentrantLock 和 Condition。
构造方法:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
插入元素,如果队列已满,则阻塞 notFull.await();:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
移除元素,如果队列已空,则阻塞 notEmpty.await();:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
obj.wait():
obj.notify():唤醒 一个 正在 Waiting Pool 中等待该对象的线程进入 Waiting for monitor entry 状态
obj.notifyAll():唤醒 所有 正在 Waiting Pool 中等待该对象的线程进入 Waiting for monitor entry 状态
线程进入 Waiting for monitor entry 状态后,一旦该对象被解锁,这些线程就会去竞争。
synchronized(obj) {
while(some condition) {
try {
obj.wait();
} catch(...) {...}
}
}
代码如下:
public class ProducerCunsumer_Test {
private static final int MAX_CAPACITY = 10;
private static List<Object> goods = new ArrayList<Object>();
public static void main(String[] args) {
(new ProducerThread()).start();
(new ConsumerThread()).start();
}
static class ProducerThread extends Thread {
public void run() {
while (true) {
// 每隔 1000 毫秒生产一个商品
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
synchronized (goods) {
// 当前商品满了,生产者等待
if (goods.size() == MAX_CAPACITY) {
try {
System.out.println("Goods full, waiting...");
goods.wait();
} catch (Exception e) {
}
}
goods.add(new Object());
System.out.println("Produce goods, total: " + goods.size());
// goods.notify() 也可以
goods.notifyAll();
}
}
}
}
static class ConsumerThread extends Thread {
public void run() {
while (true) {
// 每隔 500 毫秒消费一个商品
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
synchronized (goods) {
// 当前商品空了,消费者等待
if (goods.size() == 0) {
try {
System.out.println("No goods, waiting...");
goods.wait();
} catch (Exception e) {
}
}
goods.remove(0);
System.out.println("Consume goods, total: " + goods.size());
// goods.notify() 也可以
goods.notifyAll();
}
}
}
}
}
在上面的代码中,消费商品的速度(500毫秒)快于生产商品的速度(1000毫秒),依次输出如下所示:
可以看出,商品队列经常处于空的状态。
No goods, waiting...
Produce goods, total: 1
Consume goods, total: 0
No goods, waiting...
Produce goods, total: 1
Consume goods, total: 0
No goods, waiting...
Produce goods, total: 1
Consume goods, total: 0
如果修改,使得消费商品的速度(500毫秒)慢于生产商品的速度(100毫秒),依次输出如下所示:
可以看出,商品队列经常处于满的状态。
Produce goods, total: 1
Produce goods, total: 2
Produce goods, total: 3
Produce goods, total: 4
Produce goods, total: 5
Consume goods, total: 4
Produce goods, total: 5
Produce goods, total: 6
Produce goods, total: 7
Produce goods, total: 8
Consume goods, total: 7
Produce goods, total: 8
Produce goods, total: 9
Produce goods, total: 10
Goods full, waiting...
Consume goods, total: 9
Produce goods, total: 10
Goods full, waiting...
Consume goods, total: 9
Produce goods, total: 10
Goods full, waiting...
Consume goods, total: 9
Produce goods, total: 10
Goods full, waiting...
多个线程共享对某些变量的访问,其最后结果取决于哪个线程偶然在竞争中获胜。
condition.await()
:类似于 obj.wait()
condition.signal()
:类似于 obj.notify()
condition.signalAll()
:类似于 obj.notifyAll()
可以建立不同的多个 Condition,针对不同的竞争条件,例如:
Condition isFullCondition = lock.newCondition();
Condition isEmptyCondition = lock.newCondition();
关于 通过 wait 和 notify 实现生产者消费者模式,可以参考 链接。
利用 Lock 和 竞争条件 Condition 也可以实现生产者消费者模式,代码如下:
public class Condition_Test {
private static final int MAX_CAPACITY = 10;
private static List<Object> goods = new ArrayList<Object>();
private static final Lock lock = new ReentrantLock();
private static final Condition isFullCondition = lock.newCondition();
private static final Condition isEmptyCondition = lock.newCondition();
public static void main(String[] args) {
(new ProducerThread()).start();
(new ConsumerThread()).start();
}
static class ProducerThread extends Thread {
public void run() {
while (true) {
// 每隔 1000 毫秒生产一个商品
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// 获得锁,相当于 synchronized
lock.lock();
// 当前商品满了,生产者等待
if (goods.size() == MAX_CAPACITY) {
try {
System.out.println("Goods full, waiting...");
isEmptyCondition.await();
} catch (Exception e) {
}
}
goods.add(new Object());
System.out.println("Produce goods, total: " + goods.size());
// isFullCondition.signal() 也可以
isFullCondition.signalAll();
// 记住要释放锁
lock.unlock();
}
}
}
static class ConsumerThread extends Thread {
public void run() {
while (true) {
// 每隔 500 毫秒消费一个商品
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
// 获得锁,相当于 synchronized
lock.lock();
// 当前商品空了,消费者等待
if (goods.size() == 0) {
try {
System.out.println("No goods, waiting...");
isFullCondition.await();
} catch (Exception e) {
}
}
goods.remove(0);
System.out.println("Consume goods, total: " + goods.size());
// isEmptyCondition.signal() 也可以
isEmptyCondition.signalAll();
// 记住要释放锁
lock.unlock();
}
}
}
}
Condition
的内部实现是使用节点链来实现的,每个条件实例对应一个节点链,我们有 isFullCondition
和 isEmptyCondition
两个条件实例,所以会有两个等待节点链。当对应条件被 signal
的时候,就会把等待节点转移到同步队列中,继续竞争锁。
标签:https ems locking 插入 dad shu extends try str
原文地址:https://www.cnblogs.com/cxhfuujust/p/10880558.html