相关类Executor,Executors,AbstractExecutorService,ExecutorService
Executor:整个线程池执行者框架的顶层接口。定义了一个execute方法,整个线程执行者框架的核心方法。
public interface Executor { void execute(Runnable command); }
AbstractExecutorService:实现了ExecutorService接口中的submit,invokeAll的方法。
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
在这里,所有submit方法提交的任务最终还是调用了execute方法,execute是接口Executor中定义的方法,AbstractExecutorService没有实现它,
需要子类去实现这个方法,ThreadPoolExecutor继承了AbstractExecutorService,它实现了execute方法。ScheduledThreadPoolExecutor继承自
ThreadPoolExecutor,并覆盖了ThreadPoolExecutor的execute方法。这个方法是线程执行框者架的核心逻辑,不同的线程池执行者有不同的实现逻辑。
AbstractExecutorService的功能较为简单,实现了不同参数的submit,invokeAll方法。
ThreadPoolExecutor线程池执行者:它有一个核心的成员变量:
private final HashSet<Worker> workers = new HashSet<Worker>();
workers可以看做是ThreadPoolExecutor中用于运行任务的线程池。
worker是一个封装了一个Thread对象并实现了Runnable接口的类。封装Thread很容易理解,因为它要利用Thread去运行execute方法提交过来的runnable任务,
但是为什么会继承runnable接口呢?
下面是剔除了部分代码的Worker源码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } }
跟踪runWorker(this)方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//在这里直接调用了目标任务的run方法,并没有将它传给Thread对象。 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
下面看看ThreadPoolExecutor中的一个其他方法:
private boolean addWorker(Runnable firstTask, boolean core) { ...... try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread;//这里初始化一个Worker对象w,在将w的成员变量thread付给t if (t != null) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//在这里调用t的start方法。 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
这里仅仅介绍了ThreadPoolExecutor的线程池,那么这个线程池是如何被维护的,下面介绍几个关键的参数。
private volatile int corePoolSize; private volatile int maximumPoolSize; private final BlockingQueue<Runnable> workQueue;
corePoolSize是线程池的核心大小,maximumPoolSize是线程池的最大大小。
当提交新任务时,如果ThreadPoolExecutor中有线程在运行,并且线程的数量小于corePoolSize,那么就会有新的线程被创建。如果当前运行的线程数大于corePoolSize,就会放到缓存队列workQueue中。如果缓冲队列也满了,就继续创建线程,直到线程的数量达到maximumPoolSizepublic void execute(Runnable command) { if (command == null) throw new NullPointerException(); //判断如果当前运行的线程数小于 corePoolSize,添加新的线程(addWorker会添加一个新的线程,上面有介绍),方法直接返回。 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //如果当前的运行的线程数大于或等于corePoolSize则新的任务会放到缓存队列中。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //最后缓冲队列添加失败,则会继续添加线程。如果添加新的线程失败,则拒绝这个任务。 reject(command); }
private volatile ThreadFactory threadFactory //线程的工厂函数。 private volatile RejectedExecutionHandler handler;//任务拒绝的处理类。 private volatile long keepAliveTime;//任务等待的是将。ThreadPoolExecutor有几个构造方法来初始化这些参数。Executors类将这些参数简化了来获得一个ExecutorService的引用。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
这四个方法中前两个的核心线程数和最大线程数相同,所有可运行的线程数是固定的,<=nThreads。当任务数大于nThreads时,就是放入缓冲队列中。 后两个方法中,线程数是无边界的,核心线程数是0,最大线程数是整型的最大值,然后如果有线程60秒内没有任务运行的话就销毁。每次有新的任务来,都会创建新的线程或使用以前创建的线程(60秒内没有任务运行的线程)。你可能有疑问,既然核心线程数是0,那么所有的任务不是都放到队里里了吗?那么现在就来看看SynchronousQueue这个队里,可以看看这里的介绍http://wsmajunfeng.iteye.com/blog/1629352/。
回过头来看看任务提交方法的源码:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//这里是在往队列里方任务,如果不成功就会添加Worker(封装了线程对象) int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
原文地址:http://blog.csdn.net/ilovezhangxian/article/details/40583461