标签:schedule queue 固定 bubuko 自定义 调用 cat 出队 time
在业务场景中, 如果一个对象创建销毁开销比较大, 那么此时建议池化对象进行管理.
例如线程, jdbc连接等等, 在高并发场景中, 如果可以复用之前销毁的对象, 那么系统效率将大大提升.
另外一个好处是可以设定池化对象的上限, 例如预防创建线程数量过多导致系统崩溃的场景.
下文主要从以下几个角度讲解:
创建线程池
我们可以通过自定义ThreadPoolExecutor或者jdk内置的Executors来创建一系列的线程池
上述几种都是通过new ThreadPoolExecutor()来实现的, 构造函数源码如下:
1 /** 2 * @param corePoolSize 池内核心线程数量, 超出数量的线程会进入阻塞队列 3 * @param maximumPoolSize 最大可创建线程数量 4 * @param keepAliveTime 线程存活时间 5 * @param unit 存活时间的单位 6 * @param workQueue 线程溢出后的阻塞队列 7 */ 8 public ThreadPoolExecutor(int corePoolSize, 9 int maximumPoolSize, 10 long keepAliveTime, 11 TimeUnit unit, 12 BlockingQueue<Runnable> workQueue) { 13 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); 14 } 15 16 public static ExecutorService newFixedThreadPool(int nThreads) { 17 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 18 } 19 20 public static ExecutorService newSingleThreadExecutor() { 21 return new Executors.FinalizableDelegatedExecutorService 22 (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); 23 } 24 25 public static ExecutorService newCachedThreadPool() { 26 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 27 } 28 29 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 30 return new ScheduledThreadPoolExecutor(corePoolSize); 31 } 32 33 public ScheduledThreadPoolExecutor(int corePoolSize) { 34 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue()); 35 }
提交任务
直接调用executorService.execute(runnable)或者submit(runnable)即可,
execute和submit的区别在于submit会返回Future来获取任何执行的结果.
我们看下newScheduledThreadPool的使用示例.
1 public class SchedulePoolDemo { 2 3 public static void main(String[] args){ 4 ScheduledExecutorService service = Executors.newScheduledThreadPool(10); 5 // 如果前面的任务没有完成, 调度也不会启动 6 service.scheduleAtFixedRate(new Runnable() { 7 @Override 8 public void run() { 9 try { 10 Thread.sleep(2000); 11 // 每两秒打印一次. 12 System.out.println(System.currentTimeMillis()/1000); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 } 17 }, 0, 2, TimeUnit.SECONDS); 18 } 19 }
潜在宕机风险
使用Executors来创建要注意潜在宕机风险.其返回的线程池对象的弊端如下:
综上所述, 在可能有大量请求的线程池场景中, 更推荐自定义ThreadPoolExecutor来创建线程池, 具体构造函数配置见下文.
线程池大小配置
一般根据任务类型进行区分, 假设CPU为N核
自定义阻塞队列BlockingQueue
主要存放等待执行的线程, ThreadPoolExecutor中支持自定义该队列来实现不同的排队队列.
回调接口
线程池提供了一些回调方法, 具体使用如下所示.
1 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) { 2 3 @Override 4 protected void beforeExecute(Thread t, Runnable r) { 5 System.out.println("准备执行任务: " + r.toString()); 6 } 7 8 @Override 9 protected void afterExecute(Runnable r, Throwable t) { 10 System.out.println("结束任务: " + r.toString()); 11 } 12 13 @Override 14 protected void terminated() { 15 System.out.println("线程池退出"); 16 } 17 };
可以在回调接口中, 对线程池的状态进行监控, 例如任务执行的最长时间, 平均时间, 最短时间等等, 还有一些其他的属性如下:
自定义拒绝策略
线程池满负荷运转后, 因为时间空间的问题, 可能需要拒绝掉部分任务的执行.
jdk提供了RejectedExecutionHandler接口, 并内置了几种线程拒绝策略
使用方式也很简单, 直接传参给ThreadPool
1 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 2 new SynchronousQueue<Runnable>(), 3 Executors.defaultThreadFactory(), 4 new RejectedExecutionHandler() { 5 @Override 6 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 7 System.out.println("reject task: " + r.toString()); 8 } 9 });
自定义ThreadFactory
线程工厂用于创建池里的线程. 例如在工厂中都给线程setDaemon(true), 这样程序退出的时候, 线程自动退出.
或者统一指定线程优先级, 设置名称等等.
1 class NamedThreadFactory implements ThreadFactory { 2 private static final AtomicInteger threadIndex = new AtomicInteger(0); 3 private final String baseName; 4 private final boolean daemon; 5 6 public NamedThreadFactory(String baseName) { 7 this(baseName, true); 8 } 9 10 public NamedThreadFactory(String baseName, boolean daemon) { 11 this.baseName = baseName; 12 this.daemon = daemon; 13 } 14 15 public Thread newThread(Runnable runnable) { 16 Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement()); 17 thread.setDaemon(this.daemon); 18 return thread; 19 } 20 }
关闭线程池
跟直接new Thread不一样, 局部变量的线程池, 需要手动关闭, 不然会导致线程泄漏问题.
默认提供两种方式关闭线程池.
原文链接:https://www.cnblogs.com/xdecode/p/9119794.html
标签:schedule queue 固定 bubuko 自定义 调用 cat 出队 time
原文地址:https://www.cnblogs.com/jstarseven/p/9225665.html