一. 线程池简介
1. 线程池的概念:
线程池就是首先创建一些线程,它们的集合称为线程池。
2. 使用线程池的好处
a) 降低资源的消耗。使用线程池不用频繁的创建线程和销毁线程
b) 提高响应速度,任务:T1创建线程时间,T2任务执行时间,T3线程销毁时间,线程池空闲的时候可以去执行T1和T2,从而提高响应
c) 提高线程的可管理性。
使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。
3. 线程池的工作机制
2.1 在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程。
2.1 一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
4. 使用线程池的原因:
多线程运行时间,系统不断的启动和关闭新线程,成本非常高,会过渡消耗系统资源,以及过渡切换线程的危险,从而可能导致系统资源的崩溃。这时,线程池就是最好的选择了。
5. 线程池的主要处理流程
说明:
a)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程b。
b)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程c。
c)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
6. ThreadPoolExecutor执行execute()方法的示意
执行execute()方法是对第5点中的线程池的主要处理流程的更深层次的说明
a)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
b)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
c)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
d)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
7.线程池的创建各个参数含义
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用
TimeUnit
keepAliveTime的时间单位
workQueue
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能
threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名
Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”
8.RejectedExecutionHandler(饱和策略)
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
9.关闭线程池
shutDown():interrupt方法来终止线程
shutDownNow() 尝试停止所有正在执行的线程
10. 合理地配置线程池
线程数配置:
任务:计算密集型,IO密集型,混合型
计算密集型适合配置的线程数=计算机的cpu数或计算机的cpu数+1(应付页缺失)
IO密集型适合配置的线程数=计算机的cpu数*2
混合型适合配置的线程数,拆分成计算密集型,IO密集型
Runtime.getRuntime().availableProcessors();当前机器中的cpu核心个数
队列的选择:
尽量有界队列,不要使用无界队列
二、使用jdk中线程池的案例
1 import java.util.Random; 2 import java.util.concurrent.ArrayBlockingQueue; 3 import java.util.concurrent.ThreadPoolExecutor; 4 import java.util.concurrent.TimeUnit; 5 6 /** 7 * 使用jdk中线程池的案例 8 */ 9 public class UseThreadPool { 10 11 static class MyTask implements Runnable { 12 13 private String name; 14 15 16 public MyTask(String name) { 17 this.name = name; 18 } 19 20 public String getName() { 21 return name; 22 } 23 24 @Override 25 public void run() {// 执行任务 26 try { 27 Random r = new Random(); 28 Thread.sleep(r.nextInt(1000)+2000); 29 } catch (InterruptedException e) { 30 System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:" 31 +Thread.currentThread().isInterrupted()); 32 } 33 System.out.println("任务 " + name + " 完成"); 34 } 35 } 36 37 public static void main(String[] args) { 38 //创建线程池 39 ThreadPoolExecutor threadPoolExecutor = 40 new ThreadPoolExecutor(2,4,60, 41 TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(10)); 42 //往线程池里面提交6个线程去执行 43 for(int i =0;i<=5;i++){ 44 MyTask task = new MyTask("Task_"+i); 45 System.out.println("A new task will add:"+task.getName()); 46 threadPoolExecutor.execute(task); 47 48 } 49 //关闭线程池 50 threadPoolExecutor.shutdown(); 51 } 52 53 54 }
三、实现自己的一个线程池
手写的线程池MyThreadPool
1 import java.util.LinkedList; 2 import java.util.List; 3 4 /** 5 * 实现自己的一个线程池 6 */ 7 public class MyThreadPool { 8 9 //默认的线程个数 10 private int work_num = 5; 11 12 //线程的容器 13 private WorkThread[] workThreads; 14 15 //任务队列 16 private List<Runnable> taskQueue = new LinkedList<>(); 17 18 public MyThreadPool(int work_num) { 19 this.work_num = work_num; 20 workThreads = new WorkThread[work_num]; 21 for(int i=0;i<work_num;i++){ 22 workThreads[i] = new WorkThread(); 23 workThreads[i].start(); 24 } 25 } 26 27 //提交任务的接口 28 public void execute(Runnable task){ 29 synchronized (taskQueue){ 30 taskQueue.add(task); 31 taskQueue.notify(); 32 } 33 } 34 35 //销毁线程池 36 public void destroy(){ 37 System.out.println("ready stop pool...."); 38 for(int i=0;i<work_num;i++){ 39 workThreads[i].stopWorker(); 40 workThreads[i] = null;//加速垃圾回收 41 } 42 taskQueue.clear(); 43 } 44 45 //工作线程 46 private class WorkThread extends Thread{ 47 48 private volatile boolean on = true; 49 50 public void run(){ 51 Runnable r = null; 52 try{ 53 while(on&&!isInterrupted()){ 54 synchronized (taskQueue){ 55 //任务队列中无任务,工作线程等待 56 while(on&&!isInterrupted()&&taskQueue.isEmpty()){ 57 taskQueue.wait(1000); 58 } 59 //任务队列中有任务,拿任务做事 60 if(on&&!isInterrupted()&&!taskQueue.isEmpty()){ 61 r = taskQueue.remove(0); 62 } 63 } 64 if (r!=null){ 65 System.out.println(getId()+" ready execute...."); 66 r.run(); 67 } 68 //加速垃圾回收 69 r = null; 70 } 71 72 }catch(InterruptedException e){ 73 System.out.println(Thread.currentThread().getId()+" is Interrupted"); 74 } 75 } 76 77 public void stopWorker(){ 78 on = false; 79 interrupt(); 80 } 81 82 } 83 84 }
测试手写实现的线程池TestMyThreadPool
1 import java.util.Random; 2 3 /** 4 * 测试手写实现的线程池 5 */ 6 public class TestMyThreadPool { 7 public static void main(String[] args) throws InterruptedException { 8 // 创建3个线程的线程池 9 MyThreadPool t = new MyThreadPool(3); 10 t.execute(new MyTask("testA")); 11 t.execute(new MyTask("testB")); 12 t.execute(new MyTask("testC")); 13 t.execute(new MyTask("testD")); 14 t.execute(new MyTask("testE")); 15 System.out.println(t); 16 Thread.sleep(3000); 17 t.destroy();// 所有线程都执行完成才destory 18 System.out.println(t); 19 } 20 21 // 任务类 22 static class MyTask implements Runnable { 23 24 private String name; 25 private Random r = new Random(); 26 27 public MyTask(String name) { 28 this.name = name; 29 } 30 31 public String getName() { 32 return name; 33 } 34 35 @Override 36 public void run() {// 执行任务 37 try { 38 Thread.sleep(r.nextInt(1000)+2000); 39 } catch (InterruptedException e) { 40 System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:" 41 +Thread.currentThread().isInterrupted()); 42 } 43 System.out.println("任务 " + name + " 完成"); 44 } 45 } 46 }