标签:默认 代码示例 nec compareto tee alt ali 参数 数组
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空
阻塞队列一共有7种,我们着重讲一下
ArrayBlockingQueue ,
LinkedBlockingQueue ,
DelayQueue,
SynchronousQueue
这四种阻塞队列
基于数组实现有界的阻塞队列(循环数组)
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
private static final long serialVersionUID = -817911632652898426L; final Object[] items; //底层存储元素的数组 int takeIndex; //进行取操作时的下标 int putIndex;//进行放操作时的下标 int count;//队列中元素的数量 final ReentrantLock lock;//阻塞时用的锁 private final Condition notEmpty;//满时的condition队列 private final Condition notFull;//空时的condition队列
参数有容量和全局锁是否是公平锁
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
不用确定是否是公平锁,默认是非公平锁
public ArrayBlockingQueue(int capacity) { this(capacity, false); }
在第一个构造器的前提下,将整个集合移入阻塞队列
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
put()
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
1.首先判断添加的是否非空,是空的会抛出异常。
2.给put方法上锁
3.当集合元素数量和集合的长度相等时,调用put方法的线程将会被放入notFull队列上等待。
4.如果不相等,则enqueue(),也就是让e进入队列。
我们再看看enqueue()方法(入队方法)
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
其实就是让该元素入队,并且唤醒因为集合空而等待的线程。
take()方法同理。
LinkedBlockingQueue底层是基于链表实现的,所以其基本成员变量和LinkedList差不多。
类的继承关系
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
无参构造器,默认容量为最大容量
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
手动设定容量
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
将整个集合挪入队列中,默认容量同样是最大容量。
public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
链表就一定会有节点
内部节点类
和ArrayBlockingQueue不同的是,它有两个全局锁,一个负责放元素,一个负责取元素。
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
除了节点之外。
private transient Node<E> last;//尾节点 transient Node<E> head;//头节点 private final AtomicInteger count = new AtomicInteger();//计算当前阻塞队列中的元素个数 private final int capacity;//容量 //获取并移除元素时使用的锁,如take, poll, etc private final ReentrantLock takeLock = new ReentrantLock(); //notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 private final Condition notEmpty = takeLock.newCondition(); //添加元素时使用的锁如 put, offer, etc private final ReentrantLock putLock = new ReentrantLock(); //notFull条件对象,当队列数据已满时用于挂起执行添加的线程 private final Condition notFull = putLock.newCondition();
put()方法
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
基本和ArrayBlockingQueue一样,只是锁的数量不同,导致有一些细微的区别。
public class TestDemo16 { private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); public static void main(String[] args) { new Thread("put"){ @Override public void run() { //添加元素 for(int i=0; i<10; i++){ System.out.println("put: "+i); try { queue.put(i); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); new Thread("take"){ @Override public void run() { //获取元素 for(int i=0; i<10; i++){ try { System.out.println("take: "+queue.take()); TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
基于PriorityQueue 延时阻塞队列,DelayQueue中的元素只有当其延时时间到达,才能够去当前队列中获取到该元素,DelayQueue是一个无界队列。主要用于缓存系统的设计、定时任务系统的设计。
实现DelayQueue的三个步骤
第一步:继承Delayed接口
第二步:实现getDelay(TimeUnit unit),该方法返回当前元素还需要延时多长时间,单位是纳秒
第三步:实现compareTo方法来指定元素的顺序
例如;
class Test implements Delayed { private long time; //Test实例延时时间 public Test(long time, TimeUnit unit){ this.time = System.currentTimeMillis() + unit.toMillis(time); } @Override public long getDelay(TimeUnit unit) { return this.time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { long diff = this.time - ((Test)o).time; if(diff <= 0){ return -1; }else{ return 1; } } }
DelayQueue<Test> queue = new DelayQueue<>(); queue.put(new Test(5, TimeUnit.SECONDS)); queue.put(new Test(10, TimeUnit.SECONDS)); queue.put(new Test(15, TimeUnit.SECONDS)); System.out.println("begin time: "+ LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); for(int i=0; i<3; i++){ try { Test test = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("time: "+LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); }
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue.
public static void main(String[] args) throws InterruptedException { SynchronousQueue queue=new SynchronousQueue(); LinkedBlockingQueue new Thread("put"){ @Override public void run() { System.out.println("put has started"); for(int i=0;i<5;i++){ System.out.println("put after takeThread"); try { queue.put((int)((Math.random() * 100) + 1)); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("put has ended"); } }.start(); new Thread("take"){ @Override public void run() { System.out.println("take has started"); for(int i=0;i<5;i++){ try { System.out.println("take from putThread"+queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("put has ended"); } }.start(); }
1:ArrayBlockingQueue和LinkedBlockingQueue的区别和联系?
1)数据存储容器不一样,ArrayBlockingQueue采用数组去存储数据、LinkedBlockingQueue采用链表去存储数据。
2)ArrayBlockingQueue(循环数组)采用数组去存储数据,不会产生额外的对象实例; LinkedBlockingQueue采用链表去存储数据,在插入和删除元素只与一个节点有关,需要去生成一个额外的Node对象,这可能长时间内需要并发处理大批量的数据,对于性能和后期GC会产生影响。
3)ArrayBlockingQueue是有界的,初始化时必须要指定容量;LinkedBlockingQueue默认是无界的,Integer.MAX_VALUE, 当添加速度大于删除速度、有可能造成内存溢出。
标签:默认 代码示例 nec compareto tee alt ali 参数 数组
原文地址:https://www.cnblogs.com/lwh1019/p/13288158.html