标签:rri ring 返回 collect class nts lis arraylist some
实现了一个简化版的线程池。
实现线程池的关键有两个:一是阻塞队列,用于任务的存取,二是内部的线程对象如何持续性的执行任务,并在空闲时被回收。
线程池代码:
1 package learnConcurrent; 2 3 import java.util.ArrayList; 4 import java.util.Collection; 5 import java.util.LinkedList; 6 import java.util.List; 7 import java.util.concurrent.ArrayBlockingQueue; 8 import java.util.concurrent.BlockingQueue; 9 import java.util.concurrent.Callable; 10 import java.util.concurrent.ExecutionException; 11 import java.util.concurrent.ExecutorService; 12 import java.util.concurrent.Future; 13 import java.util.concurrent.TimeUnit; 14 import java.util.concurrent.TimeoutException; 15 import java.util.concurrent.atomic.AtomicBoolean; 16 import java.util.concurrent.atomic.AtomicInteger; 17 import java.util.concurrent.locks.ReentrantLock; 18 19 public class MyThreadPool implements ExecutorService{ 20 //线程队列 21 private List<Worker> workers; 22 //任务队列 23 private BlockingQueue<Runnable> rQueue; 24 //线程池核心大小 25 private int corePoolSize; 26 //线程池最大大小 27 private int maxPoolSize; 28 //空闲线程最长存活时间 29 private int keepAliveTime = 5; 30 31 private static final int ALIVE = 0; 32 33 private static final int SHUTDOMN = 1; 34 35 private AtomicInteger state = new AtomicInteger(ALIVE); 36 37 private ReentrantLock lock = new ReentrantLock(); 38 39 public MyThreadPool(int corePoolSize, int maxPoolSize){ 40 this.corePoolSize = corePoolSize; 41 this.maxPoolSize = maxPoolSize; 42 43 this.workers = new LinkedList<Worker>(); 44 //阻塞队列,最大容量为maxPoolSize 45 this.rQueue = new ArrayBlockingQueue<Runnable>(maxPoolSize, true); 46 } 47 48 @Override 49 public void execute(Runnable command) { 50 //FIXME size在获取时和判断时 可能发生改变 51 lock.lock(); 52 int size = workers.size(); 53 if(size < corePoolSize){//当线程池线程数小于核心数量时,增加线程 54 addWorker(); 55 }else if(size < maxPoolSize && !rQueue.isEmpty()){//当线程大于核心数量且任务队列中任务排队时,增加线程 56 addWorker(); 57 } 58 lock.unlock(); 59 60 if(!isShutdown()){ 61 rQueue.offer(command); 62 } 63 } 64 65 @Override 66 public void shutdown() { 67 //关闭线程池的简单实现,设置状态让任务队列不在接受任务,线程也会因为超时被回收 68 //缺点时空闲的线程资源得不到立即释放 69 state.set(SHUTDOMN); 70 } 71 72 /** 73 * 立即停止线程池,试图停止正在活动的线程,返回还在等待的任务列表 74 */ 75 @Override 76 public List<Runnable> shutdownNow() { 77 if(isShutdown()) 78 return null; 79 state.set(SHUTDOMN); 80 lock.lock(); 81 List<Runnable> restRunnable = new ArrayList<Runnable>(); 82 while(!rQueue.isEmpty()){ 83 restRunnable.add(rQueue.poll()); 84 } 85 for(Worker w : workers){ 86 w.interrupt(); 87 } 88 lock.unlock(); 89 return restRunnable; 90 } 91 92 @Override 93 public boolean isShutdown() { 94 return state.get() == ALIVE; 95 } 96 97 @Override 98 public boolean isTerminated() { 99 return isShutdown() && rQueue.isEmpty(); 100 } 101 102 @Override 103 public boolean awaitTermination(long timeout, TimeUnit unit) 104 throws InterruptedException { 105 // TODO Auto-generated method stub 106 return false; 107 } 108 109 @Override 110 public <T> Future<T> submit(Callable<T> task) { 111 // TODO Auto-generated method stub 112 return null; 113 } 114 115 @Override 116 public <T> Future<T> submit(Runnable task, T result) { 117 // TODO Auto-generated method stub 118 return null; 119 } 120 121 @Override 122 public Future<?> submit(Runnable task) { 123 return null; 124 } 125 126 @Override 127 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 128 throws InterruptedException { 129 return null; 130 } 131 132 @Override 133 public <T> List<Future<T>> invokeAll( 134 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 135 throws InterruptedException { 136 return null; 137 } 138 139 @Override 140 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 141 throws InterruptedException, ExecutionException { 142 return null; 143 } 144 145 @Override 146 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 147 long timeout, TimeUnit unit) throws InterruptedException, 148 ExecutionException, TimeoutException { 149 return null; 150 } 151 152 private Runnable getTask(){ 153 Runnable r = null; 154 try { 155 r = rQueue.poll(keepAliveTime, TimeUnit.SECONDS); 156 } catch (InterruptedException e) { 157 // TODO Auto-generated catch block 158 e.printStackTrace(); 159 } 160 return r; 161 } 162 163 private void addWorker(){ 164 Worker w = new Worker(); 165 w.start(); 166 lock.lock(); 167 workers.add(w); 168 lock.unlock(); 169 } 170 171 private void removeWorker(Worker w){ 172 lock.lock(); 173 workers.remove(w); 174 lock.unlock(); 175 } 176 177 class Worker extends Thread{ 178 179 private AtomicBoolean isAlive = new AtomicBoolean(true); 180 181 private Runnable task; 182 183 184 @Override 185 public void run() { 186 while(isAlive.get()){ 187 //阻塞一定时间,超时则回收该线程 188 task = getTask(); 189 if(task != null){ 190 task.run(); 191 }else{ 192 isAlive.set(false); 193 194 } 195 task = null; 196 } 197 System.out.println("remove worker"); 198 removeWorker(this); 199 } 200 201 } 202 203 204 }
测试代码:
1 package learnConcurrent; 2 3 4 public class ThreadPoolTest { 5 static int taskNo = 0; 6 public static void main(String[] args) throws InterruptedException { 7 MyThreadPool pool = new MyThreadPool(2, 5); 8 9 for(int i=0; i< 50; i++){ 10 Task task = new Task(taskNo++); 11 pool.execute(task); 12 Thread.sleep((int)(Math.random() * 1000)); 13 } 14 15 } 16 17 } 18 19 class Task implements Runnable{ 20 String str; 21 public Task(int taskNo){ 22 str = "TaskNo:" + taskNo; 23 } 24 @Override 25 public void run() { 26 System.out.println(str + " start work "); 27 //DO SOMETHING 28 try { 29 Thread.sleep((int)(Math.random() * 1000)); 30 } catch (InterruptedException e) { 31 // TODO Auto-generated catch block 32 e.printStackTrace(); 33 } 34 35 System.out.println(str + " done "); 36 } 37 38 }
虽然是继承了ExecutorService对象,但是只实现了几个接口,设计上也可能有未考虑到的问题。
测试代码也很简陋,仅供参考。
标签:rri ring 返回 collect class nts lis arraylist some
原文地址:http://www.cnblogs.com/insaneXs/p/7508328.html