标签:eve number trying existing note tle 容量 code 个数
这里讲一下Java并发编程的线程池的原理及其实现
2.1 线程池的处理流程图
该图来自《Java并发编程的艺术》:
从图中我们可以看出当一个新任务到线程池时,线程池的处理流程如下:
2.2 线程池的执行示意图
该图是Java并发包中ThreadPoolExecutor的执行示意图,图来自《Java并发编程的艺术》:
从图中我们可以看出来ThreadPoolExecutor执行任务时的处理流程基本如上所诉,每一步的处理情况如下所诉:
2.3 Java ThreadPoolExecutor类
首先来看ThreadPoolExecutor的构造函数的几个参数:
我们由线程池的执行示意图可以看出来,当任务队列满了并且线程池中线程数已经到了线程池容许的最大线程数后,将会有饱和(拒绝)策略来处理该任务。下面是Java线程池提供的4种饱和(拒绝)策略(当然你也可以自定义的自己的饱和策略),注:自己有2个策略也不是太懂,先写在这里:
注:这里使用了ThreadPoolExecutor源码(JDK 1.6)来说明Java线程池的实现原理。自己一些具体的细节也不是太懂,如:线程池的几个状态。自己在这里只是说个线程池的基本处理流程。
3.1 线程池的几个状态
/**
* runState provides the main lifecyle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don‘t accept new tasks, but process queued tasks
* STOP: Don‘t accept new tasks, don‘t process queued tasks,
* and interrupt in-progress tasks
* TERMINATED: Same as STOP, plus all threads have terminated
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TERMINATED
* When both queue and pool are empty
* STOP -> TERMINATED
* When pool is empty
*/
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
线程池的几个状态具体表示注释中说的挺明白。
3.2 ThreadPoolExecutor中一些重要的字段
注:我把JDK 1.6中该类的字段的注释保留了,这样能够更好的理解。
/**
* The queue used for holding tasks and handing off to worker
* threads. Note that when using this queue, we do not require
* that workQueue.poll() returning null necessarily means that
* workQueue.isEmpty(), so must sometimes check both. This
* accommodates special-purpose queues such as DelayQueues for
* which poll() is allowed to return null even if it may later
* return non-null when delays expire.
* 阻塞队列:用于存放执行的任务。但可能不存储任务,
* 仅仅作为线程间通信使用,如:Synchronous
*/
private final BlockingQueue workQueue;
/**
* Lock held on updates to poolSize, corePoolSize,
* maximumPoolSize, runState, and workers set.
* 全局锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
* 存放执行任务的Set
*/
private final HashSet workers = new HashSet();
/**
* Core pool size, updated only while holding mainLock, but
* volatile to allow concurrent readability even during updates.
* 核心线程数
*/
private volatile int corePoolSize;
/**
* Maximum pool size, updated only while holding mainLock but
* volatile to allow concurrent readability even during updates.
* 最大线程数
*/
private volatile int maximumPoolSize;
/**
* Current pool size, updated only while holding mainLock but
* volatile to allow concurrent readability even during updates.
* 当前线程数
*/
private volatile int poolSize;
3.3 ThreadPoolExecutor执行任务
注:这里使用的execute()方法来说明执行Runnable任务,还有submit()方法执行Callable任务,具体可以看源码:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current RejectedExecutionHandler.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* RejectedExecutionHandler, if task cannot be accepted
* for execution
* @throws NullPointerException if command is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 如果当前线程数小于核心线程数,则创建线程并执行该任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// 如果当前线程数大于核心线程数,则将该任务插入任务队列等待核心线程执行
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
// 如果当前线程数大于核心数线程数并且该任务插入任务队列失败,
// 则将会判断当前线程数是否小于最大线程数,如果小于则创建线程
// 并执行该任务,否则使用饱和(任务)策略执行该任务
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
/**
* Creates and starts a new thread running firstTask as its first
* task, only if fewer than corePoolSize threads are running
* and the pool is not shut down.
* @param firstTask the task the new thread should run first (or
* null if none)
* @return true if successful
* 仅仅当前线程数小于核心线程数并且线程池状态为运行状态时,
* 创建一个线程作为核心线程并执行该任务。
*/
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
// 创建新线程并执行该任务
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
/**
* Creates and starts a new thread running firstTask as its first
* task, only if fewer than maximumPoolSize threads are running
* and pool is not shut down.
* @param firstTask the task the new thread should run first (or
* null if none)
* 仅仅当前线程数小于最大线程数并且线程池状态为运行状态时,
* 创建一个新线程并执行该任务。
*/
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
// 创建新线程并执行该任务
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
上面的execute()方法不太好理解,需要结合线程池的执行方式并且理解上面其他两个方法的作用进行理解,还有就是上面两个方法在创建线程时都加锁了的。注:自己想不明白为何这样做,这样做仅仅使得代码看起来简洁,但是不太好理解其逻辑。
下面的方法时上面两个方法的共有方法,该方法的意义是:创建新线程并执行该任务
/**
* Creates and returns a new thread running firstTask as its first
* task. Call only while holding mainLock.
*
* @param firstTask the task the new thread should run first (or
* null if none)
* @return the new thread, or null if threadFactory fails to create thread
*/
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
// 调用线程工厂创建线线程
Thread t = threadFactory.newThread(w);
boolean workerStarted = false;
if (t != null) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
try {
// 运行该线程执行任务
t.start();
workerStarted = true;
}
finally {
if (!workerStarted)
workers.remove(w);
}
}
return t;
}
3.4 Worker类
Worker类作为工作线程的任务类,主要是执行任务,该Worker会循环获取任务队列中的任务来执行。注:该类自己有许多不太明白的地方,如shutdown、interrupt等等,所以主要讲执行逻辑。
private final class Worker implements Runnable {
}
Worker类的run()方法,该方法首先执行初始任务,然后通过getTask()方法获取任务:
/**
* Main run loop
*/
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
// 如果获得任务为空,则销毁该工作线程,具体可见源码,这里就没有列出来。
workerDone(this);
}
}
getTask()方法:
/**
* Gets the next task for a worker thread to run. The general
* approach is similar to execute() in that worker threads trying
* to get a task to run do so on the basis of prevailing state
* accessed outside of locks. This may cause them to choose the
* "wrong" action, such as trying to exit because no tasks
* appear to be available, or entering a take when the pool is in
* the process of being shut down. These potential problems are
* countered by (1) rechecking pool state (in workerCanExit)
* before giving up, and (2) interrupting other workers upon
* shutdown, so they can recheck state. All other user-based state
* changes (to allowCoreThreadTimeOut etc) are OK even when
* performed asynchronously wrt getTask.
*
* @return the task
*/
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
// 如果当前线程数大于核心线程数或者允许为核心池线程设置空闲时间
// 将会通过poll(long time,TimeUtile util)方法超时等待任务
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
// 如果当前线程池数小于或等于核心线程数,该线程就会作为核心线程
// 将会阻塞等待下去,直到任务队列中有任务。
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
从上面我们基本可以看出线程池的执行整个过程。
3.5 初始化线程池
默认情况下,创建线程池后线程池中没有工作线程,需要等任务提交后才会创建线程。
实际使用中,ThreadPoolExecutor提供了两个方法创建线程。
代码如下:
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null);
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))
++n;
return n;
}
3.6 关闭线程池
首先来看ThreadPoolExecutor关闭线程池的API:
关闭线程池的原理(注:自己不是太懂其原理,所以直接使用的《Java并发编程实战》中关于关闭线程池的说法):
Java线程池中使用shutdown()、shutdownNow()方法来关闭线程池。它们的原理普遍是遍历线程池中的工作线程,然后逐个调用线程的interrupt()方法来中断线程。shutdownNow()方法首先将线程池状态设置为STOP,然后尝试停止所有正在执行或暂停任务的线程,并放回等待执行任务的列表。而shutdown()方法只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
3.7 总结
从上面ThreadPoolExecutor的实现代码中我们可以总结出:
3.8 问题
在上面的代码中我发现在创建线程执行任务会加锁,也就是addIfUnderCorePoolSize()和addIfUnderMaximumPoolSize()方法里面的调用的addThread()方法,该方法会创建线程并执行它。但是,其执行过程会在加锁过程中,这样就造成了加锁时间过长。如下代码:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
boolean workerStarted = false;
if (t != null) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
try {
t.start();
workerStarted = true;
}
finally {
if (!workerStarted)
workers.remove(w);
}
}
return t;
}
这不是最关键的,最关键的是我在深入理解Java之线程池这篇博客看它的代码是这样的(这样的话就没有在执行任务时继续加锁了):
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //创建线程去执行firstTask任务
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w); //创建一个线程,执行任务
if (t != null) {
w.thread = t; //将创建的线程的引用赋值为w的成员变量
workers.add(w);
int nt = ++poolSize; //当前线程数加1
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
然后他说他参考的也是JDK 1.6,然后我就蒙了,难道我的是假的JDK 1.6 ..............^ - ^
我们在使用ThreadPoolExecutor时可以通过以下API来监控线程池:
然后还可以通过继承ThreadPoolExecutor类,通过重写beforeExecute()、afterExecute()等方法来自定义线程池。如下是JDK 1.6 API Docs中扩展示例,该示例添加了一个简单的暂停/恢复功能的子类:
public class PausableThreadPoolExecutor extends ThreadPoolExecutor {
public PausableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
private boolean isPaused;
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) {
try {
unpaused.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
pauseLock.unlock();
}
}
/**
* 暂停
*/
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
/**
* 恢复
*/
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
该类能够正确执行是因为工作线程(Worker)执行任务时,会调用beforeExecute(thread, task)方法,具体代码如下(JDK 1.6中ThreadPoolExecutor的Worker内部类的runTask()方法):
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
if ((runState >= STOP || (Thread.interrupted() && runState >= STOP)) && hasRun)
thread.interrupt();
boolean ran = false;
// 这里调用了beforeExecute()方法
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
// 这里调用了afterExecute()方法
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
自己依照该线程池的逻辑写的线程池
《Java并发编程的艺术》
深入理解Java之线程池
标签:eve number trying existing note tle 容量 code 个数
原文地址:http://www.cnblogs.com/maying3010/p/6953497.html