码迷,mamicode.com
首页 > 编程语言 > 详细

多线程之线程池-各个参数的含义- 阿里面试题目

时间:2018-03-23 11:43:10      阅读:210      评论:0      收藏:0      [点我收藏+]

标签:eject   block   线程池   thread   阻塞队列   管理   ash   情况   return   

阿里的面试官问了个问题,如果corepollSize=10,MaxPollSize=20,如果来了25个线程 怎么办,

答案:当然是先放在阻塞队列(如果数量为0,就一直等待,LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列,两边都可以进出的,那种,

参考:聊聊并发(七)——Java中的阻塞队列)里面了,BlockingQueue,面试官想知道具体的处理流程,我掌握的不深,于是下定决心好好查查:

尤其是那个车间里工人的例子,好好看看,理解线程很有用:

 

在上一章中我们概述了一下线程池,这一章我们看一下创建newFixedThreadPool的源码。例子还是我们在上一章中写的那个例子。

创建newFixedThreadPool的方法:

 

public static ExecutorService newFixedThreadPool(int nThreads) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>());  
}  

 

   

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {  
    return new ThreadPoolExecutor(nThreads, nThreads,  
                                  0L, TimeUnit.MILLISECONDS,  
                                  new LinkedBlockingQueue<Runnable>(),  
                                  threadFactory);  
}  

 

上面这两个方法是创建固定数量的线程池的两种方法,两者的区别是:第二种创建方法多了一个线程工厂的方法。我们继续看ThreadPoolExecutor这个类中的构造函数:

 

ThreadPoolExecutor的构造函数:

  

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue) {  
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,  
         Executors.defaultThreadFactory(), defaultHandler);  
}  

public ThreadPoolExecutor(int corePoolSize,  
                          int maximumPoolSize,  
                          long keepAliveTime,  
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,  
                          ThreadFactory threadFactory,  
                          RejectedExecutionHandler handler) {  
    if (corePoolSize < 0 ||  
        maximumPoolSize <= 0 ||  
        maximumPoolSize < corePoolSize ||  
        keepAliveTime < 0)  
        throw new IllegalArgumentException();  
    if (workQueue == null || threadFactory == null || handler == null)  
        throw new NullPointerException();  
    this.corePoolSize = corePoolSize;  
    this.maximumPoolSize = maximumPoolSize;  
    this.workQueue = workQueue;  
    this.keepAliveTime = unit.toNanos(keepAliveTime);  
    this.threadFactory = threadFactory;  
    this.handler = handler;  
}  

 

ThreadPollExecutor中的所有的构造函数最终都会调用上面这个构造函数,接下来我们来分析一下这些参数的含义: 

corePoolSize:

线程池启动后,在池中保持的线程的最小数量。需要说明的是线程数量是逐步到达corePoolSize值的。例如corePoolSize被设置为10,而任务数量只有5,则线程池中最多会启动5个线程,而不是一次性地启动10个线程。

maxinumPoolSize:

线程池中能容纳的最大线程数量,如果超出,则使用RejectedExecutionHandler拒绝策略处理。 

keepAliveTime:

线程的最大生命周期。这里的生命周期有两个约束条件:一:该参数针对的是超过corePoolSize数量的线程;二:处于非运行状态的线程。举个例子:如果corePoolSize(最小线程数)为10,maxinumPoolSize(最大线程数)为20,而此时线程池中有15个线程在运行,过了一段时间后,其中有3个线程处于等待状态的时间超过keepAliveTime指定的时间,则结束这3个线程,此时线程池中则还有12个线程正在运行。

unit:

这是keepAliveTime的时间单位,可以是纳秒,毫秒,秒,分钟等。

workQueue: 

任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。这个任务队列是一个阻塞式的单端队列。 

threadFactory:

定义如何启动一个线程,可以设置线程的名称,并且可以确定是否是后台线程等。

handler:

拒绝任务处理器。由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。
OK,ThreadPoolExecutor中的主要参数介绍完了。我们再说一下线程的管理过程:首先创建一个线程池,然后根据任务的数量逐步将线程增大到corePoolSize,如果此时仍有任务增加,则放置到workQueue中,直到workQueue爆满为止,然后继续增加池中的线程数量(增强处理能力),最终达到maxinumPoolSize。那如果此时还有任务要增加进来呢?这就需要handler来处理了,或者丢弃新任务,或者拒绝新任务,或者挤占已有的任务。在任务队列和线程池都饱和的情况下,一旦有线程处于等待(任务处理完毕,没有新任务)状态的时间超过keepAliveTime,则该线程终止,也就是说池中的线程数量会逐渐降低,直至为corePoolSize数量为止。在《编写高质量代码 改善Java程序的151个建议》这本书里举的这个例子很形象:
技术分享图片技术分享图片
 
OK,接下来我们来看一下怎么往任务队里中放入线程任务:在java.util.concurrent.AbstractExecutorService这个类的submit方法

submit方法

public Future<?> submit(Runnable task) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<Void> ftask = newTaskFor(task, null);  
    execute(ftask);//执行任务  
    return ftask;  
}  
  
/** 
 * @throws RejectedExecutionException {@inheritDoc} 
 * @throws NullPointerException       {@inheritDoc} 
 */  
public <T> Future<T> submit(Runnable task, T result) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<T> ftask = newTaskFor(task, result);  
    execute(ftask);//执行任务  
    return ftask;  
}  
  
/** 
 * @throws RejectedExecutionException {@inheritDoc} 
 * @throws NullPointerException       {@inheritDoc} 
 */  
public <T> Future<T> submit(Callable<T> task) {  
    if (task == null) throw new NullPointerException();  
    RunnableFuture<T> ftask = newTaskFor(task);  
    execute(ftask);//执行任务  
    return ftask;  
}  

 

这是三个重载方法,分别对应Runnable、带结果的Runnable接口和Callable回调函数。其中的newTaskFor也是一个重载的方法,它通过层层的包装,把Runnable接口包装成了适配RunnableFuture的实现类,底层实现如下:

public FutureTask(Runnable runnable, V result) {  
    this.callable = Executors.callable(runnable, result);  
    this.state = NEW;       // ensure visibility of callable  
}  

 

 
public static <T> Callable<T> callable(Runnable task, T result) {  
    if (task == null)  
        throw new NullPointerException();  
    return new RunnableAdapter<T>(task, result);  
}  

 

static final class RunnableAdapter<T> implements Callable<T> {  
    final Runnable task;  
    final T result;  
    RunnableAdapter(Runnable task, T result) {  
        this.task = task;  
        this.result = result;  
    }  
    public T call() {  
        task.run();  
        return result;  
    }  
}  

 

在submit中最重要的是execute这个方法,这个方法也是我们分析的重点

execute方法:

 
public void execute(Runnable command) {  
    if (command == null)  
        throw new NullPointerException();  
    int c = ctl.get();  
    if (workerCountOf(c) < corePoolSize) {//  
        if (addWorker(command, true))  
            return;  
        c = ctl.get();  
    }  
    if (isRunning(c) && workQueue.offer(command)) {  
        int recheck = ctl.get();  
        if (! isRunning(recheck) && remove(command))  
            reject(command);  
        else if (workerCountOf(recheck) == 0)  
            addWorker(null, false);  
    }  
    else if (!addWorker(command, false))  
        reject(command);  
}  

 

在这个方法中分为三部分
1、如果少于corePoolSize数量的线程在运行,则启动一个新的线程并把传进来的Runnable做为第一个任务。然后会检查线程的运行状态和worker的数量,阻止不符合要求的任务添加到线程中
2、如果一个任务成功的放入到了队列中,我们仍然需要二次检查我们是否应该添加线程或者停止。因此我们重新检查线程状态,是否需要回滚队列,或者是停止或者是启动一个新的线程
3、如果我们不能添加队列任务了,但是仍然在往队列中添加任务,如果添加失败的话,用拒绝策略来处理。
这里最主要的是addWorker这个方法:
try {  
    w = new Worker(firstTask);  
    final Thread t = w.thread;  
    if (t != null) {  
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try {  
            // Recheck while holding lock.  
            // Back out on ThreadFactory failure or if  
            // shut down before lock acquired.  
            int rs = runStateOf(ctl.get());  
  
            if (rs < SHUTDOWN ||  
                (rs == SHUTDOWN && firstTask == null)) {  
                if (t.isAlive()) // precheck that t is startable  
                    throw new IllegalThreadStateException();  
                workers.add(w);  
                int s = workers.size();  
                if (s > largestPoolSize)  
                    largestPoolSize = s;  
                workerAdded = true;  
            }  
        } finally {  
            mainLock.unlock();  
        }  
        if (workerAdded) {  
            t.start();  
            workerStarted = true;  
        }  
    }  
} finally {  
    if (! workerStarted)  
        addWorkerFailed(w);  
}  

 

我们在这个方法里创建一个线程,注意这个线程不是我们的任务线程,而是经过包装的Worker线程。所以这里的run方法是Worker这个类中的run方法。execute方法是通过Worker类启动的一个工作线程,执行的是我们的第一个任务,然后该线程通过getTask方法从任务队列总获取任务,之后再继续执行。这个任务队列是一个BlockingQueue,是一个阻塞式的,也就是说如果该队列元素为0,则保持等待状态。直到有任务进入为止。

多线程之线程池-各个参数的含义- 阿里面试题目

标签:eject   block   线程池   thread   阻塞队列   管理   ash   情况   return   

原文地址:https://www.cnblogs.com/aspirant/p/8628843.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!