标签:concurrent java 阻塞队列 线程安全 arrayblockingqueue
通过这个类的名字,可以知道ArrayBlockingQueue是一个底层使用数组实现,具有队列特点的先进先出以及线程安全的一个集合类,他还可以实现指定时间的阻塞读写,也就是可以解决生产者消费者问题的阻塞队列。
首先来看一下它的构造方法:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw newIllegalArgumentException();
this.items = newObject[capacity];
lock = newReentrantLock(fair);
notEmpty =lock.newCondition();
notFull = lock.newCondition();
}
这是前两个构造方法,可以发现一个问题,就是它没有默认构造器,传入的参数即为创建的对象数组的大小,同时初始化锁和这个锁上的两个Condition,一个为 notEmpty,用作队列空时,进行take操作的等待;另一个为notFull,作用是队列满时,put操作的等待。
首先来看一下offer方法,offer方法有两个重载的实现, 用于插入元素至数组的尾部。
先看一下不带参数的offer方法:
lock.lock();
try {
if (count ==items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
其中的count是队列中的元素的个数,items就是存值的数组,在数组满的情况下则不进入等待,而是直接返回 false。不满的情况下,会进入insert方法,会执行notEmpty.signal();,唤醒再队列为空时take操作等待的线程
它还有一个有三个参数的重载方法,如数组已满,则进入等待,直到以下三种情况时才继续:被唤醒、到达指定的时间或当前线程被中断(interrupt)。此方法首先将指定的时间转换为纳秒,然后执行加锁操作,如数组未满,则将对象插入数组,如数组已满,且已超过指定的时间,则返回 false;如未超过指定的时间,则调用 notFull condition 的 awaitNanos 方法进行等待,如为被唤醒或超时,则继续判断数组是否已满;如线程被 interrupt,则直接抛出 InterruptedException。
再来看一下put 方法
具体的代码如下:
public void put(E e) throwsInterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
可以看到,这个方法在数组已满的情况下会一直等待,直到数组不为空或线程被 interrupt。
接下来再来看一下poll方法,此方法用于获取队列中的第一个元素。和offer一样,也有两个重载。
public E poll() {
final ReentrantLocklock = this.lock;
lock.lock();
try {
return (count ==0) ? null : extract();
} finally {
lock.unlock();
}
}
这是不带时间参数的 poll 方法,它会在在数组中元素个数为零的情况下则不进入等待,而是直接返回 null,不为空的情况下执行extract()方法,执行notFull.signal();唤醒put阻塞的线程。
还有一个是有三个参数的poll(E,long,TimeUnit)方法。如队列中没有元素,则进入等待,与 offer(E,long,TimeUnit)相同,它也是在三种情况后继续。 首先将指定的时间转化为纳秒,并进行加锁,如数组中的元素个数不为零,则从当前的对象数组中获取最后一个元素,在获取后将该位置上的元素设置为 null;如数组中的元素个数为零,首先判断剩余的等待时间是否小于零,如小于则返回 null,如大于则调用 notEmpty condition 的 awaitNanos 方法进行等待,如为被唤醒或超时,则继续判断数组中元素个数是否不为零;如线程被 interrupt,则直接抛出 interruptedException。
最后再看一下take方法,源码如下:
publicE take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
可以看到,调用 take 方法,此方法在数组为空的情况下会一直等待,直到数组不为空或线程被 interrupt。再调用extract方法,取出队首的元素。
标签:concurrent java 阻塞队列 线程安全 arrayblockingqueue
原文地址:http://blog.csdn.net/qfycc92/article/details/45501391