标签:
了解线程池的知识后,写个线程池实例,熟悉多线程开发,建议看jdk线程池源码,跟大师比,才知道差距啊O(∩_∩)O
1 package thread.pool2; 2 3 import java.util.LinkedList; 4 5 public class ThreadPool { 6 //最大线程数 7 private int maxCapacity; 8 //初始线程数 9 private int initCapacity; 10 //当前线程数 11 private int currentCapacity; 12 //线程池需要执行的任务 13 private LinkedList<Task> tasks; 14 //当前处于等待的线程数 15 private int waitThreadNum = 0; 16 //线程池中线程数超过初始数量时,此时有线程执行完任务,但是没有后续的任务执行,则会等待一段时间后,该线程才销毁 17 //destroyTime小于或等于0时,线程立即消费,大于0,则等待设置的时间 18 private int destroyTime = 0; 19 20 public ThreadPool(int initCapacity,int maxCapacity, int destroyTime) { 21 if(initCapacity > maxCapacity) { 22 //初始线程数不能超过最大线程数,当然此处可以抛出异常,提示不允许这么设置 23 initCapacity = maxCapacity; 24 } 25 this.maxCapacity = maxCapacity; 26 this.initCapacity = initCapacity; 27 this.currentCapacity = initCapacity; 28 this.tasks = new LinkedList<Task>(); 29 this.waitThreadNum = initCapacity; 30 this.destroyTime = destroyTime; 31 } 32 /** 33 * 向线程池中添加任务,如果线程数不够,则增加线程数,但线程数总量不能超过给定的最大线程数 34 * @param task 35 */ 36 public synchronized void addTask(Task task) { 37 tasks.add(task); 38 addThread(); 39 notifyAll(); 40 } 41 /** 42 * 从线程池中取出任务,如果没有任务,则当前线程处于等待状态 43 * @return 44 * @throws InterruptedException 45 */ 46 public synchronized Task getTask() throws InterruptedException { 47 while(tasks.isEmpty()) { 48 wait(); 49 } 50 //取出第一个任务的同时将第一个任务移除 51 return tasks.pollFirst(); 52 } 53 /** 54 * 判断线程池中任务列表是否为空 55 * @return 56 */ 57 public synchronized boolean isEmpty() { 58 return tasks.isEmpty(); 59 } 60 /** 61 * 活跃线程数加1 62 */ 63 public synchronized void addWaitThreadNum(int num) { 64 waitThreadNum += num; 65 } 66 /** 67 * 活跃线程数减1 68 */ 69 public synchronized void reduceWaitThreadNum(int num) { 70 waitThreadNum -= num; 71 } 72 73 /** 74 * 启动线程池 75 */ 76 public void execute() { 77 System.out.println(initCapacity); 78 for(int i = 0; i < initCapacity; i++) { 79 (new Thread(new InnerThread(this, "thread"+ i))).start(); 80 } 81 } 82 /** 83 * 如果当前线程数大于初始线程数,则关闭当前线程,否则当前线程处于等待状态 84 * @return 85 * @throws InterruptedException 86 */ 87 public synchronized boolean waitOrClose(int tmp) throws InterruptedException { 88 System.out.println(currentCapacity + ":" + initCapacity); 89 //线程退出前,等待一段时间,防止线程频繁创建和销毁线程 90 if(destroyTime > 0) { 91 wait(destroyTime); 92 } 93 if(currentCapacity > initCapacity && tasks.isEmpty()) { 94 currentCapacity--; 95 System.out.println("任务执行完后,当前线程数:" + currentCapacity); 96 return false; 97 } 98 System.out.println("线程等待结束"); 99 addWaitThreadNum(tmp); 100 wait(); 101 return true; 102 } 103 104 /** 105 * 当线程池内线程数不够时,如果有任务在等待处理,同时当前线程都处于非等待状态, 106 * 则增加线程池中线程数,但不能超过线程池中最大线程数 107 */ 108 public synchronized void addThread() { 109 System.out.println("当前线程数:" + currentCapacity + "最大线程数:" + maxCapacity + "等待线程数" + waitThreadNum); 110 if(currentCapacity < maxCapacity && waitThreadNum == 0) { 111 //每添加一个线程,当前线程数加1 112 currentCapacity++; 113 //每添加一个线程,相当于线程池中多了一个等待的线程 114 waitThreadNum++; 115 System.out.println("当前线程数为:" + currentCapacity); 116 new Thread(new InnerThread(this, "thread" + (currentCapacity-1))).start(); 117 } 118 } 119 /** 120 * 线程池中单个线程对象 121 * @author yj 122 * 123 */ 124 private class InnerThread implements Runnable { 125 126 private ThreadPool threadPool; 127 private String threadName; 128 129 public InnerThread(ThreadPool threadPool, String threadName) { 130 this.threadPool = threadPool; 131 this.threadName = threadName; 132 } 133 134 @Override 135 public void run() { 136 try { 137 while(true){ 138 int addWait = 0; 139 int resuceWait = 1; 140 //不等于空,则处理任务 141 while(!threadPool.isEmpty()) { 142 threadName = Thread.currentThread().getName(); 143 reduceWaitThreadNum(resuceWait); 144 Task task = threadPool.getTask(); 145 task.execute(threadName); 146 try { 147 Thread.sleep(9000); 148 } catch (InterruptedException e) { 149 e.printStackTrace(); 150 } 151 System.out.println(threadName + "对"+task.getTaskName()+"+任务进行了处理"); 152 //只有处理任务后回到等待状态的线程才将waitThreadNum加1 153 addWait = 1; 154 //如果不跳出循环,则等待线程数不减少 155 resuceWait = 0; 156 } 157 //等于空,则等待任务或关闭当前线程 158 if(threadPool.waitOrClose(addWait)) { 159 System.out.println(threadName + "处于等待状态"); 160 continue; 161 } 162 //关闭线程 163 break; 164 } 165 } catch (InterruptedException e) { 166 e.printStackTrace(); 167 } 168 } 169 } 170 }
1 package thread.pool2; 2 3 public class Task{ 4 5 private String taskName; 6 7 public String getTaskName() { 8 return taskName; 9 } 10 11 public Task(String taskName) { 12 this.taskName = taskName; 13 } 14 15 public void execute(String threadName) { 16 System.out.println(threadName + "开始执行任务为" + taskName); 17 /*try { 18 Thread.sleep(9000); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 }*/ 22 System.out.println(threadName + "执行" + taskName + "任务完成"); 23 } 24 25 }
1 package thread.pool2; 2 3 public class ThreadPoolTest { 4 5 public static void main(String[] args) { 6 ThreadPool threadPool = new ThreadPool(3, 10, 1100); 7 threadPool.execute(); 8 for(int i = 0; i < 50; i++) { 9 int random = (int) (Math.random() * 1000); 10 threadPool.addTask(new Task("task"+random)); 11 /*try { 12 //每个1秒向线程池中添加任务 13 Thread.sleep(1000); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 }*/ 17 } 18 } 19 20 }
标签:
原文地址:http://www.cnblogs.com/yuyuj/p/4524379.html