BlockingQueue
一、阻塞队列基本方法介绍
谈到线程池,不得不谈到生产者-消费者模式,谈到生产者-消费者,就不得不谈到对应的数据结构,谈到对应的数据结构不得不言BlockingQueue。
顾名思义,BlockingQueue翻译为阻塞队列。队列无非两种操作:入队和出队。而针对于入队出队的边界值的不同,分为几个方法:
抛出异常 |
特殊值 |
阻塞 |
超时 |
|
插入 |
||||
移除 |
||||
检查 |
不可用 |
不可用 |
测试代码:
public class QueueTest { public static void main(String args[]) { final BlockingQueue queue = new ArrayBlockingQueue(5); init(queue); System.out.println("queue.size=" + queue.size() + ", top element:" + queue.element()); // queue.add("f"); //1. add方法:队满加入抛异常 /*boolean bool = queue.offer("f"); //2. offer方法,队满加入会返回:false System.out.println("queue.size=" + queue.size() + ", 入队结果:" + bool);*/ /* try { queue.put("f"); //3.put方法,队满put会阻塞,下边的systemout方法不会执行,直至有消费者take出去 } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } System.out.println("queue.size=" + queue.size() + ", put结束:");*/ Thread thread1 = new Thread(new Runnable() { public void run() { boolean bool = false; try { bool = queue.offer("f", 5, TimeUnit.SECONDS); //5.offer带时间参数:当队满时,如果等待一定时间内还是满的就返回false,如果在这个期间队有空间了就可以放入一个元素,返回 System.out.println("queue.size=" + queue.size() + ", 入队结果:" + bool); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } }); thread1.start(); //我们开一个消费者线程做出队操作,以便上述的offer可以正常加入 Thread thread = new Thread(new Runnable() { public void run() { while (queue.size() > 0) { try { String str = (String) queue.take(); System.out.println("queue.size=" + queue.size() + ", 出队结果:" + str); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } }
} }); thread.start();
}
private static void init(BlockingQueue queue) { queue.add("a"); queue.add("b"); queue.add("c"); queue.add("d"); queue.add("e"); }
} |
add:入队,如果队列满会抛出异常。
Exception in thread "main" java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:71) at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:209) |
源码:
public boolean add(E e) { if (offer(e)) return true; //如果加入成功,返回true else throw new IllegalStateException("Queue full"); //如果添加失败,抛异常 } |
Offer:入队,如果队满会返回false
boolean bool = queue.offer("f"); //offer方法会返回:false |
源码:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; //返回失败 else { insert(e); return true; //返回成功 } } finally { lock.unlock(); } } |
Put:入队,如果队满会阻塞
try { queue.put("f"); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } System.out.println("queue.size=" + queue.size() + ", put结束:" ); |
源码:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == items.length) notFull.await(); //阻塞在这儿 } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } insert(e); } finally { lock.unlock(); } } |
Offer:带时间的,会有一个缓冲时间,若超过此期间插入失败则失败。
queue.size=5, top element:a queue.size=4, 出队结果:a queue.size=5, 入队结果:true queue.size=4, 出队结果:b queue.size=3, 出队结果:c queue.size=2, 出队结果:d queue.size=1, 出队结果:e queue.size=0, 出队结果:f |
源码:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //轮询 if (count != items.length) { //如果有其他线程消费元素,队列不满了,才可以插入元素 insert(e); return true; } if (nanos <= 0)//如果到了约定时间没有插入成功,返回false return false; try { nanos = notFull.awaitNanos(nanos); //轮询减时间 } catch (InterruptedException ie) { notFull.signal(); // propagate to non-interrupted thread throw ie; } } } finally { lock.unlock(); } } |
出队的几种形式与入队对应,实例略。
从上述源码也可以看出,BlockingQueque还有几个特点:
1. 不接受null值。
2. 线程安全,方法都用了Lock
3. 最最精华的部分:生产者-消费者模型。
二、生产者-消费者模型
实例:
public class BlockingQueueTest {
private static AtomicInteger count = new AtomicInteger(0); private static AtomicInteger countCreate = new AtomicInteger(0);
public static void main(String args[]) { //定义一个阻塞队列,存放文件信息 BlockingQueue fileQueue = new ArrayBlockingQueue(5); String path = "F:\\Song"; File root = new File(path); //生产者线程去遍历文件,放入队列 FileCrawler fileCrawler = new FileCrawler(fileQueue, root); //消费者线程去遍历队列,取出文件 Indexer indexer = new Indexer(fileQueue);
//开启几个生产者线程开始遍历文件 for (File file : root.listFiles()) { new Thread(new FileCrawler(fileQueue, file)).start(); } //开启7个消费者者线程开始取出文件 for (int i = 0; i < 7; i++) { new Thread(new Indexer(fileQueue)).start(); }
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. }
System.out.println("生产者生产:" + countCreate.get()); System.out.println("消费者取到:" + count.get()); }
static class FileCrawler implements Runnable { private final BlockingQueue fileQueue; private final File root;
FileCrawler(BlockingQueue fileQueue, File root) { this.fileQueue = fileQueue; this.root = root; }
public void run() {
try { System.out.println("生产者开始生产:" + fileQueue.size()); crawl(root); } catch (InterruptedException e) { e.printStackTrace(); } }
private void crawl(File root) throws InterruptedException { File[] files = root.listFiles(); if (files != null) { for (File file : files) { if (file.isDirectory()) { crawl(file); } else { fileQueue.put(file); //put countCreate.incrementAndGet(); } } } } }
static class Indexer implements Runnable {
private final BlockingQueue fileQueue;
Indexer(BlockingQueue fileQueue) { this.fileQueue = fileQueue; }
public void run() { while (true) { try { System.out.println("消费者开始消费:" + fileQueue.size()); File file = (File) fileQueue.take(); //take count.incrementAndGet(); System.out.println(file.getName()); } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } } }
} |
三、阻塞队列的实现类
基本的任务排队方式有三种:
有界:ArrayBlockingQueue: 一个由数组支持的有界阻塞队列。
无界:LinkedBlockingQueue: 一个基于已链接节点的、范围任意的blocking queue。
同步移交:SynchronousQueue: 同步队列,put和take串行执行。生产者对其的插入操作必须等待消费者的移除操作,反之亦然。同步队列类似于信道,它非常适合传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。
synchronousQueue的思想:
参考:http://ifeve.com/java-synchronousqueue/
实例:
public class SynchroNousQueueTest { public static void main(String args[]) { // final SynchronousQueue synchronousQueue = new SynchronousQueue(); SynchroNousQueueTest synchroNousQueueTest = new SynchroNousQueueTest(); final MyShnchronouseQueue<String> synchronousQueue = synchroNousQueueTest.new MyShnchronouseQueue<String>(); //1。开启一个生产者线程 Thread threadPut = new Thread(new Runnable() { public void run() { try { for (int i = 0; i < 10; i++) { synchronousQueue.put(i + ""); System.out.println("synchronousQueue,insert element:" + i); } } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } });
//2。开启一个消费者线程 Thread threadTask = new Thread(new Runnable() { public void run() { try { for (int i = 0; i < 10; i++) { synchronousQueue.take(); System.out.println("synchronousQueue,output element:" + i); } } catch (InterruptedException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } }); threadPut.start(); threadTask.start(); }
class MyShnchronouseQueue<E> { Lock lock = new ReentrantLock(); Condition isFull = lock.newCondition(); Condition isEmpty = lock.newCondition(); boolean flag = false; //同步开关 E item = null; //只有一个元素
public void put(E e) throws InterruptedException { lock.lock(); try { while (flag) { // 当开关为true时,put阻塞,一直await isEmpty.await(); } //当开关为false之后,改为true,item设值,唤醒消费者消费 flag = true; item = e; isFull.signalAll(); } catch (Exception e1) { e1.printStackTrace(); } finally { lock.unlock(); }
}
public synchronized E take() throws InterruptedException { lock.lock(); try { while (!flag) { // 当开关为false时,take阻塞,一直await isFull.await(); } //当开关为true之后,改为false,获取item的值,唤醒生产者生产 flag = false; E e = item; item = null; isEmpty.signalAll(); return e; } catch (Exception e1) { e1.printStackTrace(); } finally { lock.unlock(); } return null; } } }
|
结果:
synchronousQueue,insert element:0 synchronousQueue,output element:0 synchronousQueue,output element:1 synchronousQueue,insert element:1 synchronousQueue,insert element:2 synchronousQueue,output element:2 synchronousQueue,insert element:3 synchronousQueue,output element:3 synchronousQueue,insert element:4 synchronousQueue,output element:4 synchronousQueue,insert element:5 synchronousQueue,output element:5 synchronousQueue,insert element:6 synchronousQueue,output element:6 synchronousQueue,insert element:7 synchronousQueue,output element:7 synchronousQueue,insert element:8 synchronousQueue,output element:8 synchronousQueue,insert element:9 synchronousQueue,output element:9 |
四、线程池的选择:
根据这些队列的不同特性,我们的线程池也定义了不同的类别:
单一线程池:可以看到corePoolSize=1
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } |
固定大小线程池:corePoolSize和maximumPoolSize固定,
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } |
无界线程池:maximumPoolSize为Integer.MAX_VALUE
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } |
前两者默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理他们的速度,那么队列将无限制地增加。
一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(这就需要一些饱和策略)在使用有界队列工作时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时可以减少上下文切换,但付出的代价是可能会限制吞吐量。
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界或者可以拒绝任务时,SynchronousQueue才有实际价值。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择。它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像在接受网络客户请求的服务器应用程序中,如果不进行限制,那么狠容易发生过载问题。
从另一个维度来看:cpu密集型任务,由于cpu使用率一直很高,这时的线程不宜过多,建议配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务由于线程并不是一直在执行任务,IO比较频繁,所以可以配置较多的线程,如2*Ncpu。