标签:
一、序言
当我们需要使用线程的时候,我们可以随时新建一个线程,这样实现起来非常简便,但在某些场景下存在缺陷:如果需要同时执行多个任务(即并发的线程数量很多),频繁地创建线程会降低系统的效率,因为创建和销毁线程均需要一定的时间。线程池可以使线程得到复用,所谓线程复用就是线程在执行完一个任务后并不被销毁,该线程可以继续执行其他的任务。
二、Executors提供的线程池
Executors是线程的工厂类,也可以说是一个线程池工具类,Executors提供的线程都是通过参数设置来实现不同的线程池机制。
三、Executors的简单使用示例
package com.test; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorsDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { // ExecutorService executor = Executors.newSingleThreadExecutor(); // ExecutorService executor = Executors.newCachedThreadPool(); ExecutorService executor = Executors.newFixedThreadPool(5); Thread.sleep(10000L);//方便监控工具能捕获到 for (int i = 0; i < 20; i++) { final int no = i; Runnable runnable = new Runnable() { public void run() { try { System.out.println("into" + no); Thread.sleep(1000L); System.out.println("end" + no); } catch (InterruptedException e) { e.printStackTrace(); } } }; executor.execute(runnable); }//End for executor.shutdown(); System.out.println("Thread Main End!"); } }
上述代码创建了一个固定长度的线程池,其运行结果如下:
Thread Main End!
into2
into3
into0
into1
into4
end2
into5
end3
into6
end0
into7
end1
into8
end4
into9
end6
end5
into10
into11
end9
end7
into13
end8
into14
into12
end11
end10
into15
into16
end13
end12
into17
end14
into19
into18
end15
end16
end19
end17
end18
从上面的结果来看,在某一时刻只有5个线程在执行(创建固定大小的线程池),然后结束一个再执行一个。
解说:一个任务通过 execute(Runnable)方法被添加到线程池,任务是一个Runnable类型的对象,任务的执行方法是Runnable类型对象的run()方法。
四、简述线程池的属性
五、详解ThreadPoolExecutor
上文提到可以通过显式的ThreadPoolExecutor构造函数来构造特定形式的线程池,ThreadPoolExecutor是java.util.concurrent包以内部线程池的形式对外提供线程池管理、线程调度等服务,此处我们来了解一下ThreadPoolExecutor
(1)一般使用方式:
ExecutorService exec = new ThreadPoolExecutor(8, 8, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
下文详解此示例涉及的一些内容
(2)构造函数的声明:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
(3)函数参数说明:
参数名 | 代表含义 |
corePoolSize | 线程池的基本大小(核心线程池大小) |
maximumPoolSize | 线程池的最大大小 |
keepAliveTime | 线程池中超过corePoolSize数目的空闲线程的最大存活时间 |
unit | keepAliveTime参数的时间单位 |
workQueue | 任务阻塞队列 |
threadFactory | 新建线程的工厂 |
handler | 当提交的任务数超过maxmumPoolSize与workQueue之和时,任务会交给RejectedExecutionHandler来处理 |
进一步解说:
A、当提交新任务时,若线程池大小小于corePoolSize,将创建一个新的线程来执行任务,即使此时线程池中存在空闲线程;
B、当提交新任务时,若线程池达到corePoolSize大小,新提交的任务将被放入workQueue中,等待线程池调度执行;
C、当提交新任务时,若workQueue已满,且maximumPoolSize>corePoolSize,将创建新的线程来执行任务;
D、当提交新任务时,若任务总数超过maximumPoolSize,新提交的任务将由RejectedExecutionHandler来处理;
E、当线程池中的线程数超过corePoolSize时,若线程的空闲时间达到keepAliveTime,则关闭空闲线程
(4)任务阻塞队列选择机制
(5)简述SynchronousQueue
注:此处贴出SynchronousQueue的使用示例,示例中使用了Semaphore,更多关于SynchronousQueue及Semaphore的内容请参考其他文章
package com.test; import java.util.concurrent.Semaphore; import java.util.concurrent.SynchronousQueue; /* * 程序中有10个线程来消费生成者产生的数据,这些消费者都调用TestDo.doSome()方法去进行处理, * 每个消费者都需要一秒才能处理完,程序应保证这些消费者线程依次有序地消费数据,只有上一个消费者消费完后, * 下一个消费者才能消费数据,下一个消费者是谁都可以,但要保证这些消费者线程拿到的数据是有顺序的。 */ public class SynchronousQueueTest { public static void main(String[] args) { System.out.println("begin:" + (System.currentTimeMillis() / 1000)); // 定义一个Synchronous final SynchronousQueue<String> sq = new SynchronousQueue<String>(); // 定义一个数量为1的信号量,其作用相当于一个互斥锁 final Semaphore sem = new Semaphore(1); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { try { sem.acquire(); String input = sq.take(); String output = TestDo.doSome(input);//内部类 System.out.println(Thread.currentThread().getName()+ ":" + output); sem.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } for (int i = 0; i < 10; i++) { String input = i + ""; //此处将i变成字符串 try { sq.put(input); } catch (InterruptedException e) { e.printStackTrace(); } } }//End main } class TestDo { public static String doSome(String input) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String output = input + ":" + (System.currentTimeMillis() / 1000); return output; } }
上述代码的运行结果如下:
begin:1458954798 Thread-0:0:1458954799 Thread-1:1:1458954800 Thread-2:2:1458954801 Thread-3:3:1458954802 Thread-4:4:1458954803 Thread-5:5:1458954804 Thread-6:6:1458954805 Thread-7:7:1458954806 Thread-8:8:1458954807 Thread-9:9:1458954808
从上述结果看,上例在任意某一时刻只有一个线程在执行,且只有前一个线程执行完下一个线程才开始
六、饱和策略(线程池任务拒绝策略)
上文提到ThreadPoolExecutor构造函数的RejectedExecutionHandler handler参数,该参数表示当提交的任务数超过maxmumPoolSize与workQueue之和时,任务会交给RejectedExecutionHandler来处理,此处我们来具体了解一下
(1)四种饱和策略
(2)源码分析:
RejectedExecutionHandler这个接口是用来处理被丢弃的线程的异常处理接口,其源码如下:
public interface RejectedExecutionHandler{ //被线程池丢弃的线程处理机制 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) ; }
AbortPolicy(中止策略)继承RejectedExecutionHandler接口,其源码如下:
public static class AbortPolicy implements RejectedExecutionHandler{ public AbortPolicy(){} //直接抛出异常 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RejectedExecutionException("Task"+r.toString()+"rejected from"+executor.toString()); } }
我们可以自己实现RejectedExecutionHandler接口,将实现类作为线程丢弃处理类,代码如下:
package com.test; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectedExecutionHandlerDemo implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // TODO Auto-generated method stub System.out.println("线程信息"+r.toString()+"被遗弃的线程池:"+executor.toString()); } }
七、定制ThreadPoolExecutor
(1)通过修改参数的方式达到定制目的
(2)通过自定义方式(封装各种参数)达到定制目的
示例(摘自网络):
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class CustomThreadPoolExecutor { private ThreadPoolExecutor pool = null; /** * 线程池初始化方法 * * corePoolSize 核心线程池大小----10 * maximumPoolSize 最大线程池大小----30 * keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit * TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES * workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞队列 * threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂 * rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时, * 即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)), * 任务会交给RejectedExecutionHandler来处理 */ public void init() { pool = new ThreadPoolExecutor( 10, 30, 30, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10), new CustomThreadFactory(), new CustomRejectedExecutionHandler()); } public void destory() { if(pool != null) { pool.shutdownNow(); } } public ExecutorService getCustomThreadPoolExecutor() { return this.pool; } private class CustomThreadFactory implements ThreadFactory { private AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1); System.out.println(threadName); t.setName(threadName); return t; } } private class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 记录异常 // 报警处理等 System.out.println("error............."); } } // 测试构造的线程池 public static void main(String[] args) { CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor(); // 1.初始化 exec.init(); ExecutorService pool = exec.getCustomThreadPoolExecutor(); for(int i=1; i<100; i++) { System.out.println("提交第" + i + "个任务!"); pool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("running====="); } }); } // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了 // exec.destory(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
八、扩展ThreadPoolExecutor
九、源码视角
从源码视角分析Executors、ThreadPoolExecutor、ExecuteService、Executor之间的关系,此处简单提及,读者可查看下一节“参考资料”以了解相关内容
(1)Executors
从Java5开始新增了Executors类,它有几个静态工厂方法用来创建线程池。
(2)Executor
Executor是一个接口,里面只有一个方法
public interface Executor { void execute(Runnable command); }
(3)ExecuteService
ExecuteService也是一个接口,其定义如下:
public interface ExecutorService extends Executor {...}
(4)ThreadPoolExecutor继承AbstractExecutorService,AbstractExecutorService实现ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService {...}
public abstract class AbstractExecutorService implements ExecutorService {...}
十、参考资料
本文仅简单阐述了Java并发中关于Executors及ThreadPoolExecutor的内容,此处贴出一些优质文章以供读者阅览
(1)http://blog.csdn.net/xiamizy/article/details/40781939
(2)http://www.cnblogs.com/dolphin0520/p/3932921.html
(3)http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315645.html
标签:
原文地址:http://www.cnblogs.com/studyLog-share/p/5286290.html