java 1.5 concurrent 工具包中提供了五类线程池的创建:
ExecutorService executor=Executors.newCachedThreadPool(); ExecutorService cacheExecutor=Executors.newCachedThreadPool(new TestThreadFactory()); ExecutorService fixExecutor=Executors.newFixedThreadPool(10); ExecutorService fixedExecutor=Executors.newFixedThreadPool(10, new TestThreadFactory()); ExecutorService sigExecutor=Executors.newSingleThreadExecutor(); ExecutorService singleExecutor=Executors.newSingleThreadExecutor(new TestThreadFactory()); ScheduledExecutorService schExecutor=Executors.newScheduledThreadPool(10); ScheduledExecutorService scheduledExecutor=Executors.newScheduledThreadPool(10,new TestThreadFactory()); ScheduledExecutorService ssExecutor=Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService sigSchExcutor=Executors.newSingleThreadScheduledExecutor(new TestThreadFactory());
SingleThreadExecutor其实是FixedThreadPool的一个特例,SingleThreadExecutor指定对于同一个队列只有一个线程去循环读取队列任务并执行, FiexedThreadPool则可以为同一队列指定多个线程去循环读取队列任务并执行.
newFixedThreadPool(10)会产生10个线程去读取同一个任务队列,但这10个线程不是同时产生,而是提交一个任务(即执行一次execute()或者submit()方法)产生一个,当提交的任务数量超过10个,第11个任务直接提交到blockQueue<Runnable>队列里,然后由这10个线程中的某个线程去获取并执行该任务.FixedThreadPool产生的10个线程以后也不会被回收成9个,更不可能增加到11个.
CacheThreadPool不指定具体数量的线程去读取并只执行任务队列中的任务,但是它有个最大线程数(Integer.MAX_VALUE=2的32次-1), 当 任务队列饱和无法插入新任务时,会自动生成一个新的线程去执行新插入的任务,并参与读取饱和的任务队列并执行.如果高峰期生成了10个线程,低谷期只需要一个线程来执行,其余的9个线程在存活一段时间后就会被终止.存活时间默认是一分钟.这一点要和FixedThreadPool区分.
ScheduledThreadPool线程池线程数量也需要预先指定,它的主要特点是按计划延时读取并执行队列任务
无论何种线程,当任务队列增加任务的速度大于队列读取执行的速度时,就可能产生任务丢失的情况,丢失的概率由低到高依次是
CacheThreadPool > newFixedThreadPool > SingleThreadExecutor,这个很好理解.这种情况下,程序默认都会向外抛出RejectedExecutionException异常
new 线程池的时候另一个构造参数 ThreadFactory,主要用途就是对提交的任务做个简单的封装.
附上几个核心的代码片段
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
/** * Runs a single task between before/after methods. */ private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { /* * Ensure that unless pool is stopping, this thread * does not have its interrupt set. This requires a * double-check of state in case the interrupt was * cleared concurrently with a shutdownNow -- if so, * the interrupt is re-enabled. */ if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); /* * Track execution state to ensure that afterExecute * is called only if task completed or threw * exception. Otherwise, the caught runtime exception * will have been thrown by afterExecute itself, in * which case we don't want to call it again. */ boolean ran = false; beforeExecute(thread, task); try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * Main run loop */ public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
原文地址:http://blog.csdn.net/zhuyijian135757/article/details/42199147