码迷,mamicode.com
首页 > 编程语言 > 详细

深入学习Java线程池

时间:2019-08-28 21:09:32      阅读:167      评论:0      收藏:0      [点我收藏+]

标签:方式   条件   max   long   future   catch   rdp   影响   设定   

在前面的例子中,我们都是通过new Thread来创建一个线程,由于线程的创建和销毁都需要消耗一定的CPU资源,所以在高并发下这种创建线程的方式将严重影响代码执行效率。而线程池的作用就是让一个线程执行结束后不马上销毁,继续执行新的任务,这样就节省了不断创建线程和销毁线程的开销。

ThreadPoolExecutor

创建Java线程池最为核心的类为ThreadPoolExecutor

技术图片

它提供了四种构造函数来创建线程池,其中最为核心的构造函数如下所示:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

 

这7个参数的含义如下:

  1. corePoolSize 线程池核心线程数。即线程池中保留的线程个数,即使这些线程是空闲的,也不会被销毁,除非通过ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法开启了核心线程的超时策略;

  2. maximumPoolSize 线程池中允许的最大线程个数;

  3. keepAliveTime 用于设置那些超出核心线程数量的线程的最大等待时间,超过这个时间还没有新任务的话,超出的线程将被销毁;

  4. unit 超时时间单位;

  5. workQueue 线程队列。用于保存通过execute方法提交的,等待被执行的任务;

  6. threadFactory 线程创建工程,即指定怎样创建线程;

  7. handler 拒绝策略。即指定当线程提交的数量超出了maximumPoolSize后,该使用什么策略处理超出的线程。

在通过这个构造方法创建线程池的时候,这几个参数必须满足以下条件,否则将抛出IllegalArgumentException异常:

  1. corePoolSize不能小于0;

  2. keepAliveTime不能小于0;

  3. maximumPoolSize 不能小于等于0;

  4. maximumPoolSize不能小于corePoolSize;

此外,workQueue、threadFactory和handler不能为null,否则将抛出空指针异常。

下面举些例子来深入理解这几个参数的含义。

使用上面的构造方法创建一个线程池:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
System.out.println("线程池创建完毕");

int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("活跃线程个数 " + threadPoolExecutor.getActiveCount());
System.out.println("核心线程个数 " + threadPoolExecutor.getCorePoolSize());
System.out.println("队列线程个数 " + threadPoolExecutor.getQueue().size());
System.out.println("最大线程数 " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}

 

上面的代码创建了一个核心线程数量为1,允许最大线程数量为2,最大活跃时间为10秒,线程队列长度为1的线程池。

假如我们通过execute方法向线程池提交1个任务,看看结果如何:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());
System.out.println("线程池创建完毕");

threadPoolExecutor.execute(() -> sleep(100));

int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("活跃线程个数 " + threadPoolExecutor.getActiveCount());
System.out.println("核心线程个数 " + threadPoolExecutor.getCorePoolSize());
System.out.println("队列线程个数 " + threadPoolExecutor.getQueue().size());
System.out.println("最大线程数 " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}

 

ThreadPoolExecutor的execute和submit方法都可以向线程池提交任务,区别是,submit方法能够返回执行结果,返回值类型为Future

sleep方法代码:

1
2
3
4
5
6
7
8
private static void sleep(long value) {
try {
System.out.println(Thread.currentThread().getName() + "线程执行sleep方法");
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

 

启动程序,控制台输出如下:技术图片

线程池核心线程数量为1,通过execute提交了一个任务后,由于核心线程是空闲的,所以任务被执行了。由于这个任务的逻辑是休眠100秒,所以在这100秒内,线程池的活跃线程数量为1。此外,因为提交的任务被核心线程执行了,所以并没有线程需要被放到线程队列里等待,线程队列长度为0。

假如我们通过execute方法向线程池提交2个任务,看看结果如何:

1
2
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));

 

技术图片

线程池核心线程数量为1,通过execute提交了2个任务后,一开始核心线程是空闲的,Thread-0被执行。由于这个任务的逻辑是休眠100秒,所以在这100秒内,线程池的活跃线程数量为1。因为核心线程数量为1,所以另外一个任务在这100秒内不能被执行,于是被放到线程队列里等待,线程队列长度为1。

假如我们通过execute方法向线程池提交3个任务,看看结果如何:

1
2
3
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));

 

技术图片

这三个任务都是休眠100秒,所以核心线程池中第一个任务正在被执行,第二个任务被放入到了线程队列。而当第三个任务被提交进来时,线程队列满了(我们定义的长度为1),由于该线程池允许的最大线程数量为2,所以线程池还可以再创建一个线程来执行另外一个任务,于是乎之前在线程队列里的线程被取出执行(FIFO),第三个任务被放入到了线程队列。

改变第二个和第三个任务的睡眠时间,观察输出:

1
2
3
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));

 

技术图片

第二个任务提交5秒后,任务执行完毕,所以线程队列里的任务被执行,于是队列线程个数为0,活跃线程数量为2(第一个和第三个任务)。再过5秒后,第三个任务执行完毕,于是活跃线程数量为1(第一个100秒还没执行完毕)。

在第三个任务结束的瞬间,我们观察线程快照:

技术图片

可以看到,线程池中有两个线程,Thread-0在执行第一个任务(休眠100秒,还没结束),Thread-1执行完第三个任务后并没有马上被销毁。过段时间后(10秒钟后)再观察线程快照:

技术图片

可以看到,Thread-1这个线程被销毁了,因为我们在创建线程池的时候,指定keepAliveTime 为10秒,10秒后,超出核心线程池线程外的那些线程将被销毁。

假如一次性提交4个任务,看看会怎样:

1
2
3
4
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));
threadPoolExecutor.execute(() -> sleep(100));

 

技术图片

因为我们设置的拒绝策略为AbortPolicy,所以最后提交的那个任务直接被拒绝了。更多拒绝策略下面会介绍到。

关闭线程池

线程池包含以下几个状态:

技术图片

当线程池中所有任务都处理完毕后,线程并不会自己关闭。我们可以通过调用shutdownshutdownNow方法来关闭线程池。两者的区别在于:

  1. shutdown方法将线程池置为shutdown状态,拒绝新的任务提交,但线程池并不会马上关闭,而是等待所有正在折行的和线程队列里的任务都执行完毕后,线程池才会被关闭。所以这个方法是平滑的关闭线程池。

  2. shutdownNow方法将线程池置为stop状态,拒绝新的任务提交,中断正在执行的那些任务,并且清除线程队列里的任务并返回。所以这个方法是比较“暴力”的。

举两个例子观察下两者的区别:

shutdown例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());

threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());

threadPoolExecutor.shutdown();
System.out.println("已经执行了线程池shutdown方法");
}

static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程中被打断" + e.getMessage());
}
}
}

static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程中被打断" + e.getMessage());
}
}
}

 

启动程序,控制台输出如下:

技术图片

可以看到,虽然在任务都被提交后马上执行了shutdown方法,但是并不会马上关闭线程池,而是等待所有被提交的任务都执行完了才关闭。

shutdownNow例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());

threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());

List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 马上关闭,并返回还未被执行的任务
System.out.println(runnables);

System.out.println("已经执行了线程池shutdownNow方法");
}

static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程中被打断" + e.getMessage());
}
}
}

static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程中被打断" + e.getMessage());
}
}
}

 

启动程序,控制台输出如下:技术图片

可以看到,在执行shutdownNow方法后,线程池马上就被关闭了,正在执行中的两个任务被打断,并且返回了线程队列中等待被执行的两个任务。

通过上面两个例子我们还可以看到shutdownshutdownNow方法都不是阻塞的。常与shutdown搭配的方法有awaitTermination

awaitTermination方法接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。该方法是阻塞的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 4, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy());

threadPoolExecutor.execute(new shortTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new longTask());
threadPoolExecutor.execute(new shortTask());

threadPoolExecutor.shutdown();
boolean isShutdown = threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
if (isShutdown) {
System.out.println("线程池在3秒内成功关闭");
} else {
System.out.println("等了3秒还没关闭,不等了╰(‵□′)╯");
}
System.out.println("------------");
}

static class shortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程中被打断" + e.getMessage());
}
}
}

static class longTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程中被打断" + e.getMessage());
}
}
}

 

启动程序输出如下:

技术图片

4大拒绝策略

当线程池无法再接收新的任务的时候,可采取如下四种策略:技术图片

CallerRunsPolicy

CallerRunsPolicy策略:由调用线程处理该任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.CallerRunsPolicy());

threadPoolExecutor.execute(new shortTask("任务1"));
threadPoolExecutor.execute(new longTask("任务2"));
threadPoolExecutor.execute(new longTask("任务3"));
threadPoolExecutor.execute(new shortTask("任务4"));
threadPoolExecutor.execute(new shortTask("任务5"));

threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

private String name;

public shortTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程中被打断" + e.getMessage());
}
}
}

static class longTask implements Runnable {

private String name;

public longTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程中被打断" + e.getMessage());
}
}
}

 

上面的线程池最多只能一次性提交4个任务,第5个任务提交后会被拒绝策略处理。启动程序输出如下:

技术图片

可以看到,第5个提交的任务由调用线程(即main线程)处理该任务。

AbortPolicy

AbortPolicy策略:丢弃任务,并抛出RejectedExecutionException异常。前面的例子就是使用该策略,所以不再演示。

DiscardOldestPolicy

DiscardOldestPolicy策略:丢弃最早被放入到线程队列的任务,将新提交的任务放入到线程队列末端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.DiscardOldestPolicy());

threadPoolExecutor.execute(new shortTask("任务1"));
threadPoolExecutor.execute(new longTask("任务2"));
threadPoolExecutor.execute(new longTask("任务3"));
threadPoolExecutor.execute(new shortTask("任务4"));
threadPoolExecutor.execute(new shortTask("任务5"));

threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

private String name;

public shortTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程中被打断" + e.getMessage());
}
}
}

static class longTask implements Runnable {

private String name;

public longTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程中被打断" + e.getMessage());
}
}
}

 

启动程序输出如下:

技术图片

可以看到最后提交的任务被执行了,而第3个任务是第一个被放到线程队列的任务,被丢弃了。

DiscardPolicy

DiscardPolicy策略:直接丢弃新的任务,不抛异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 3, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.DiscardPolicy());

threadPoolExecutor.execute(new shortTask("任务1"));
threadPoolExecutor.execute(new longTask("任务2"));
threadPoolExecutor.execute(new longTask("任务3"));
threadPoolExecutor.execute(new shortTask("任务4"));
threadPoolExecutor.execute(new shortTask("任务5"));

threadPoolExecutor.shutdown();
}

static class shortTask implements Runnable {

private String name;

public shortTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("shortTask执行过程中被打断" + e.getMessage());
}
}
}

static class longTask implements Runnable {

private String name;

public longTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");
} catch (InterruptedException e) {
System.err.println("longTask执行过程中被打断" + e.getMessage());
}
}
}

 

启动程序,输出如下:技术图片

第5个任务直接被拒绝丢弃了,而没有抛出任何异常。

线程池工厂方法

除了使用ThreadPoolExecutor的构造方法创建线程池外,我们也可以使用Executors提供的工厂方法来创建不同类型的线程池:

技术图片

newFixedThreadPool

查看newFixedThreadPool方法源码:

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

 

可以看到,通过newFixedThreadPool创建的是一个固定大小的线程池,大小由nThreads参数指定,它具有如下几个特点:

  1. 因为corePoolSize和maximumPoolSize的值都为nThreads,所以线程池中线程数量永远等于nThreads,不可能新建除了核心线程数的线程来处理任务,即keepAliveTime实际上在这里是无效的。

  2. LinkedBlockingQueue是一个无界队列(最大长度为Integer.MAX_VALUE),所以这个线程池理论是可以无限的接收新的任务,这就是为什么上面没有指定拒绝策略的原因。

newCachedThreadPool

查看newCachedThreadPool方法源码:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

 

这是一个理论上无限大小的线程池:

  1. 核心线程数为0,SynchronousQueue队列是没有长度的队列,所以当有新的任务提交,如果有空闲的还未超时的(最大空闲时间60秒)线程则执行该任务,否则新增一个线程来处理该任务。

  2. 因为线程数量没有限制,理论上可以接收无限个新任务,所以这里也没有指定拒绝策略。

newSingleThreadExecutor

查看newSingleThreadExecutor源码:

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

 

  1. 核心线程数和最大线程数都为1,每次只能有一个线程处理任务。

  2. LinkedBlockingQueue队列可以接收无限个新任务。

newScheduledThreadPool

查看newScheduledThreadPool源码:

1
2
3
4
5
6
7
8
9
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
......

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

 

所以newScheduledThreadPool理论是也是可以接收无限个任务,DelayedWorkQueue也是一个无界队列。

使用newScheduledThreadPool创建的线程池除了可以处理普通的Runnable任务外,它还具有调度的功能:

1.延迟指定时间后执行:

1
2
3
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延迟5秒执行
executorService.schedule(() -> System.out.println("hello"), 5, TimeUnit.SECONDS);

 

2.按指定的速率执行:

1
2
3
4
5
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延迟1秒执行,然后每5秒执行一次
executorService.scheduleAtFixedRate(
() -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);

 

技术图片

3.按指定的时延执行:

1
2
3
4
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(
() -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS
);

 

技术图片

乍一看,scheduleAtFixedRate和scheduleWithFixedDelay没啥区别,实际它们还是有区别的:

  • scheduleAtFixedRate按照固定速率执行任务,比如每5秒执行一个任务,即使上一个任务没有结束,5秒后也会开始处理新的任务;

  • scheduleWithFixedDelay按照固定的时延处理任务,比如每延迟5秒执行一个任务,无论上一个任务处理了1秒,1分钟还是1小时,下一个任务总是在上一个任务执行完毕后5秒钟后开始执行。

对于这些线程池工厂方法的使用,阿里巴巴编程规程指出:

技术图片

因为这几个线程池理论是都可以接收无限个任务,所以这就有内存溢出的风险。实际上只要我们掌握了ThreadPoolExecutor构造函数7个参数的含义,我们就可以根据不同的业务来创建出符合需求的线程池。一般线程池的创建可以参考如下规则:

  • IO密集型任务,线程池线程数量可以设置为2 X CPU核心数;

  • 计算密集型任务,线程池线程数量可以设置为CPU核心数 + 1。

一些API的用法

ThreadPoolExecutor提供了几个判断线程池状态的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);

threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

threadPoolExecutor.shutdown();
System.out.println("线程池为shutdown状态:" + threadPoolExecutor.isShutdown());
System.out.println("线程池正在关闭:" + threadPoolExecutor.isTerminating());
System.out.println("线程池已经关闭:" + threadPoolExecutor.isTerminated());
threadPoolExecutor.awaitTermination(6, TimeUnit.SECONDS);
System.out.println("线程池已经关闭" + threadPoolExecutor.isTerminated());
}

 

程序输出如下:

技术图片

前面我们提到,线程池核心线程即使是空闲状态也不会被销毁,除非使用allowCoreThreadTimeOut设置了允许核心线程超时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.allowCoreThreadTimeOut(true);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("任务执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

 

程序输出如下所示:

技术图片

5秒后任务执行完毕,核心线程处于空闲的状态。因为通过allowCoreThreadTimeOut方法设置了允许核心线程超时,所以3秒后(keepAliveTime设置为3秒),核心线程被销毁。核心线程被销毁后,线程池也就没有作用了,于是就自动关闭了。

值得注意的是,如果一个线程池调用了allowCoreThreadTimeOut(true)方法,那么它的keepAliveTime不能为0。

ThreadPoolExecutor提供了一remove方法,查看其源码:

1
2
3
4
5
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}

 

可看到,它删除的是线程队列中的任务,而非正在被执行的任务。举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("任务执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

Runnable r = () -> System.out.println("看看我是否会被删除");
threadPoolExecutor.execute(r);
threadPoolExecutor.remove(r);

threadPoolExecutor.shutdown();
}

 

执行程序,输出如下:

技术图片

可看到任务并没有被执行,已经被删除,因为唯一一个核心线程已经在执行任务了,所以后提交的这个任务被放到了线程队列里,然后通过remove方法删除。

默认情况下,只有当往线程池里提交了任务后,线程池才会启动核心线程处理任务。我们可以通过调用prestartCoreThread方法,让核心线程即使没有任务提交,也处于等待执行任务的活跃状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 2, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);
System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());
}

 

程序输出如下所示:

技术图片

该方法返回boolean类型值,如果所以核心线程都启动了,返回false,反之返回true。

还有一个和它类似的prestartAllCoreThreads方法,它的作用是一次性启动所有核心线程,让其处于活跃地等待执行任务的状态。

ThreadPoolExecutor的invokeAny方法用于随机执行任务集合中的某个任务,并返回执行结果,该方法是同步方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);

// 任务集合
List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
// 随机执行结果
Integer result = threadPoolExecutor.invokeAny(tasks);
System.out.println("-------------------");
System.out.println(result);
threadPoolExecutor.shutdownNow();
}

 

启动程序,输出如下:

技术图片

ThreadPoolExecutor的invokeAll则是执行任务集合中的所有任务,返回Future集合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,
new ThreadPoolExecutor.AbortPolicy()
);

List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());

List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);
futureList.stream().map(f->{
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
}).forEach(System.out::println);

threadPoolExecutor.shutdownNow();
}

 

输出如下:技术图片

总结下这些方法:

方法描述
allowCoreThreadTimeOut(boolean value) 是否允许核心线程空闲后超时,是的话超时后核心线程将销毁,线程池自动关闭
awaitTermination(long timeout, TimeUnit unit) 阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。
execute(Runnable command) 向线程池提交任务,没有返回值
submit(Runnable task) 向线程池提交任务,返回Future
isShutdown() 判断线程池是否为shutdown状态
isTerminating() 判断线程池是否正在关闭
isTerminated() 判断线程池是否已经关闭
remove(Runnable task) 移除线程队列中的指定任务
prestartCoreThread() 提前让一个核心线程处于活跃状态,等待执行任务
prestartAllCoreThreads() 提前让所有核心线程处于活跃状态,等待执行任务
getActiveCount() 获取线程池活跃线程数
getCorePoolSize() 获取线程池核心线程数
threadPoolExecutor.getQueue() 获取线程池线程队列
getMaximumPoolSize() 获取线程池最大线程数
shutdown() 让线程池处于shutdown状态,不再接收任务,等待所有正在运行中的任务结束后,关闭线程池。
shutdownNow() 让线程池处于stop状态,不再接受任务,尝试打断正在运行中的任务,并关闭线程池,返回线程队列中的任务。

深入学习Java线程池

标签:方式   条件   max   long   future   catch   rdp   影响   设定   

原文地址:https://www.cnblogs.com/7788IT/p/11426443.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!