标签:否则 使用场景 read rup 获取 except 两种 ace 连接
阻塞队列是 java.util.concurrent 包提供的一个类,该类提供了多线程中通过队列实现安全高效的数据处理的功能。
所谓阻塞队列,是在普通队列基础上实现了阻塞线程的功能:
以下是阻塞队列实现阻塞线程的两种常用场景:
阻塞队列提供的方法:
插入方法:
1. boolean add(E e):队列没有满,则插入数据并返回true;队列满时,抛出异常 java.lang.IllegalStateException: Queue full。
2. boolean offer(E e):队列没有满,则插入数据并返回true;队列满时,返回false。
3. void put(E e):队列没有满,则插入数据;队列满时,阻塞调用此方法线程,直到队列有空闲空间时此线程进入就绪状态。
4. boolean offer(E e, long timeout, TimeUnit unit):队列没有满,插入数据并返回true;队列满时,阻塞调用此方法线程,若指定等待的时间内还不能往队列中插入数据,返回false。
移除方法:
1. E remove():队列非空,则以FIFO原则移除数据,并返回该数据的值;队列为空,抛出异常 java.util.NoSuchElementException。
2. E poll():队列非空,移除数据,并返回该数据的值;队列为空,返回null。
3. E take():队列非空,移除数据,并返回该数据的值;队列为空,阻塞调用此方法线程,直到队列为非空时此线程进入就绪状态。
4. E poll(long timeout, TimeUnit unit):队列非空,移除数据,并返回该数据的值;队列为空,阻塞调用此方法线程,若指定等待的时间内队列都没有数据可取,返回null。
检查方法:
1. E element():队列非空,则返回队首元素;队列为空,抛出异常 java.util.NoSuchElementException。
2. E peek():队列非空,则返回队首元素;队列为空,返回null。
获取所有成员的方法:
1. int drainTo(Collection<? super E> c):一次性从BlockingQueue获取所有可用的数据对象存入集合中。
2. int drainTo(Collection<? super E> c, int maxElements):从BlockingQueue获取指定数据的个数的对象存入集合中。
JDK提供的阻塞队列:
1. ArrayBlockingQueue :一个由数组结构实现的有界阻塞队列。
ArrayBlockingQueue内部,维护了一个定长数组。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下对象内部采用非公平锁,所谓公平锁是指阻塞的所有生产者线程(或消费者线程),当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素(先阻塞的消费者线程,可以先从队列里获取元素)。通常情况下为了保证公平性会降低吞吐量。
2. LinkedBlockingQueue :一个由链表结构实现的有界阻塞队列。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。按照先进先出的原则对元素进行排序。
3. DelayQueue:一个使用优先级队列实现的无界阻塞队列。
DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。使用场景:常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
4. PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
PriorityBlockingQueue是一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。
5. SynchronousQueue:一个不存储元素的阻塞队列。
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
6. LinkedTransferQueue:一个由链表结构实现的无界阻塞队列。
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。
7. LinkedBlockingDeque:一个由链表结构实现的双向阻塞队列。
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
阻塞队列常用场景是 “生产者—消费者” 模式,以下是一个生产者不断生产随机数据存入队列,消费者不断获取的实例:
import java.util.Random; import java.util.concurrent.*; public class BlockingQueueTest1 { /** * 生产者 */ public static class Producer implements Runnable { /** * 阻塞队列 */ private BlockingQueue<Integer> blockingQueue; /** * 判断是否循环 */ private boolean isRunning = true; /** * 随机数据范围 */ private static final int RANGE_FOR_DATA = 1000; private Random random = new Random(); public Producer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (isRunning) { try { /** 生产出随机数 */ int data = random.nextInt(RANGE_FOR_DATA); System.out.println(Thread.currentThread().getName() + " 生产数据:" + data); /** 将随机数放入阻塞队列 */ blockingQueue.put(data); System.out.println(Thread.currentThread().getName() + " 插入队列:" + data); /** 进行随机时间休眠 */ Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { System.out.println("程序结束啦,我不用再等待阻塞队列有空余位置了!"); } } } /** * 终止生产线程 */ public void shutDown() { isRunning = false; } } /** * 消费者 */ public static class Consumer implements Runnable { /** * 阻塞队列 */ private BlockingQueue<Integer> blockingQueue; /** * 判断是否循环 */ private boolean isRunning = true; /** * 随机数据范围 */ private Random random = new Random(); public Consumer(BlockingQueue<Integer> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (isRunning) { try { /** 从阻塞队列中获取随机数 */ int data = (int) blockingQueue.take(); System.out.println(Thread.currentThread().getName() + " 消费数据:" + data); /** 进行随机时间休眠 */ Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { System.out.println("程序结束啦,我不用再等待阻塞队列非空了!"); } } } /** * 终止消费线程 */ public void shutDown() { isRunning = false; } } public static void main(String[] args) { /** 创建容量大小为5的阻塞队列 */ BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5); /** 创建连接池 */ ExecutorService pool = Executors.newCachedThreadPool(); /** 创建生产线程,消费线程各5个 */ Producer[] producers = new Producer[5]; Consumer[] consumers = new Consumer[5]; /** 实例化生产线程与消费线程并且执行线程 */ for (int i = 0; i < producers.length; i++) { producers[i] = new Producer(blockingQueue); consumers[i] = new Consumer(blockingQueue); pool.execute(producers[i]); pool.execute(consumers[i]); } try { /** 等待5秒后进行手动中断 */ Thread.sleep(5 * 1000); for (int i = 0; i < producers.length; i++) { producers[i].shutDown(); consumers[i].shutDown(); } /** 其实提不提醒线程关闭都一个样了,阻塞的线程,不会因为手动中断而中断的 */ pool.shutdown(); /** 等待2秒,若还有线程没有关闭则强行中断所有等待线程 */ if (!pool.awaitTermination(2 * 1000, TimeUnit.MILLISECONDS)) { /** 超时的时候向线程池中所有的线程发出中断 */ pool.shutdownNow(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
标签:否则 使用场景 read rup 获取 except 两种 ace 连接
原文地址:https://www.cnblogs.com/JimKing/p/9067821.html