标签:OLE 特性 util interface break cat 主线程 link 编写
要求:
JavaSE阶段性测试1:手写线程池需求
需求背景:由于频繁的自行创建和销毁线程非常的消耗资 源,并且难以控制线程的并发数量,所以项目组急需编写 一个线程池功能,能够自行管理线程的生命周期,并且根 据繁忙线程池程度对池中线程进行动态扩容。
现在项目经理抽象出线程池对象中包含有几个主要特性
1. 可以根据任务数量进行线程数量扩容,在空闲时线程
池中只保存coreSize数量的线程,而当coreSize个线程已 经全部在工作时,这个时候如果再添加任务进来,会新建 线程,接受任务,直到池中的线程数量达到maxSize时, 不在新建线程
2. 当线程数量达到maxSize时,并且所有线程都处于工
作状态,这时如果还有任务添加过来,则将其保存在一个 队列中(可以自定义队列,也可以自行调研并使用jdk中已 有的Queue子类)
3. 如果队列中的任务数量已经达到了最大值(队列满
了),这个时候又有任务添加过来,则执行丢弃策略(可 以什么都不做,也可以仍出异常,也可以让主线程中直接 调用run处理掉等等,最好使用策略模式而不是if-else)
4. 定义线程的空闲时间,如果线程池处理完了任务,线 程空闲了,则根据已定义新增线程的空闲时间,如果时间达到了,则回收线程直到线程数量达到coreSize数量为 止。(注意回收等操作的原子性,防止数据不一致)
思路,允许先在一个类中完成所有功能,再对代码进行面 向对象重构 也可以直接研读jdk线程池源码,参照写一份也可以(不建 议) 编写测试类,测试你的线程池是否功能正常
package com.test4; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; /** * @Author:pjq * @Date: 2019/9/23 9:30 */ public class SimpleThreadPool extends Thread { /** * 当前线程队列的大小 */ private int size; /** * 任务队列大小 */ private final int queueSize; /** * 默认任务队列大小 */ private final static int TASK_QUEUE_SIZE = 200; /** * 任务队列 正在执行任务的线程 */ private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>(); /** * 线程队列,可以取出空闲线程 */ private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>(); /** * 线程ID */ private static volatile int seq = 0; /** * 线程池是否被销毁 */ private volatile boolean destroyed = false; private int coreSize; private int maxSize; private int active; //队列满了丢异常 private DiscardPolicy discardPolicy = new MyAbortPolicy(); class MyAbortPolicy implements DiscardPolicy { @Override public void discard(String name) throws RuntimeException { throw new RuntimeException("超出任务...") ; } } interface DiscardPolicy { void discard(String name) throws RuntimeException; } public SimpleThreadPool() { this(4, 8, 12, TASK_QUEUE_SIZE); } public SimpleThreadPool(int coreSize, int active, int maxSize, int queue_size) { this.queueSize = queue_size; this.coreSize = coreSize; this.maxSize = maxSize; this.active = active; init(); } /** * 首先创建最小数量的线程池 */ private void init() { for (int i = 0; i < coreSize; i++) { createWorkTask(); } this.size = coreSize; this.start(); } /** * 创建空闲线程 加入线程池 */ private void createWorkTask() { WorkerTask task = new WorkerTask("pool_" + (seq++)); task.start(); THREAD_QUEUE.add(task);//添加任务 } /** * 向外提供方法 提交任务,如果任务大小超过线程池大小 200 则直接抛弃 */ public void submit(Runnable runnable) { synchronized (TASK_QUEUE) { if (TASK_QUEUE.size() > queueSize) { discardPolicy.discard("抛弃任务。。"); } TASK_QUEUE.addLast(runnable); TASK_QUEUE.notifyAll(); } } /** * 监控线程池,进行动态的扩容和缩小 */ @Override public void run() { while (!destroyed) { try { TimeUnit.MILLISECONDS.sleep(1000); System.err.printf("pool Min:%d,Active:%d,Max:%d,current:%d,QueueSize:%d\n", coreSize, active, maxSize, size, TASK_QUEUE.size()); } catch (InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(5000); //判断当队列大小大于活动大小 创建任务(第一次扩容) if (TASK_QUEUE.size() > active && size < active) { for (int i = size; i < active; i++) { createWorkTask(); } System.out.println("active 池自动调整为 " + active); size = active; //判断当线程任务大小大于最大线程大小时,创建任务(创建任务) } else if (TASK_QUEUE.size() > maxSize && size < maxSize) { for (int i = size; i < maxSize; i++) { createWorkTask(); } System.out.println("max 池自动调整为 " + maxSize); size = maxSize; } synchronized (THREAD_QUEUE) { if (TASK_QUEUE.isEmpty() && size > active) { int release = size - active; for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator(); it.hasNext(); ) { if (release < 0) { break; } WorkerTask task = it.next(); task.interrupt(); it.remove(); release--; } size = active; System.out.println("池释放大小为" + size); } } } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 任务有四种状态 */ private enum TaskState { FREE, RUNNING, BLOCKED, DEAD } /** * 内部类 将runnable封装为task执行 */ private static class WorkerTask extends Thread { private volatile TaskState taskState = TaskState.FREE; public WorkerTask(String name) { super(name); } public TaskState getTaskState() { return taskState; } /** * 运行任务队列中的任务 */ @Override public void run() { //循环标记 OUTER: while (this.taskState != TaskState.DEAD) { Runnable runnable; synchronized (TASK_QUEUE) { while (TASK_QUEUE.isEmpty()) { try { taskState = TaskState.BLOCKED; TASK_QUEUE.wait(); } catch (InterruptedException e) { break OUTER; } } runnable = TASK_QUEUE.removeFirst(); } if (runnable != null) { taskState = TaskState.RUNNING; runnable.run(); taskState = TaskState.FREE; } } } } public static void main(String[] args) throws InterruptedException { SimpleThreadPool threadPool = new SimpleThreadPool(); IntStream.range(0, 200).forEach(i -> threadPool.submit(() -> { System.out.println("任务" + Thread.currentThread().getName() + " 接收... " + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务" + Thread.currentThread().getName() + " 关闭... " + i); })); //threadPool.shutDown(); /* for ( int i = 0; i < 203; i++) { threadPool.submit(()-> { System.out.println("运行的任务池" + Thread.currentThread().getName()+" "); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("运行任务" + Thread.currentThread().getName() + "关"); }); // System.out.println("----------------------------"); } */ } }
标签:OLE 特性 util interface break cat 主线程 link 编写
原文地址:https://www.cnblogs.com/pjqq/p/11581606.html