标签:
最近在看《深入剖析Tomcat》,在第四章有讲解Tomcat的默认连接器,和这个连接器有着密切关系的是一个处理器。一个连接器可以对应多个处理器。连接器的作用是从客户端的请求中提取出Socket,然后把Socket传递给处理器来处理用户的请求。需要注意的是连接器的关注点是在接收请求,分派给他下游的处理器去执行,在把Socket传递下去后,连接器在这次请求中的任务已经完成。这里可以看到,连接器和处理器他们是异步进行的,异步的处理为多线程的执行提供了可能。
在这里我要说的是,在看了Tomcat的源码实现后,有了点自己的想法,写出了两个相对类似的类,以自己以后参考以及分享给大家。
1 public class AsyncCaller implements Runnable{ 2 3 private static Logger logger = Logger.getLogger(AsyncCaller.class); 4 5 private Stack<AsyncExecutor> executors = new Stack<AsyncExecutor>(); 6 private boolean shutdown = false; 7 8 private static int MAX_THREAD_NUMBER = 10; 9 10 public static void main(String[] args) { 11 AsyncCaller ac = new AsyncCaller(); 12 ac.init(); 13 new Thread(ac).start(); 14 } 15 16 public void init() { 17 for( int i = 0; i < MAX_THREAD_NUMBER; i++ ) { 18 AsyncExecutor ae = new AsyncExecutor(i, this); 19 ae.threadStart(); 20 executors.add(ae); 21 } 22 } 23 24 public void recycle(AsyncExecutor ae) { 25 this.executors.push(ae); 26 } 27 28 @Override 29 public void run() { 30 while( !shutdown ) { 31 logger.info("Input:"); 32 BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); 33 String input = null; 34 try { 35 input = br.readLine(); 36 } catch (IOException e) { 37 e.printStackTrace(); 38 } 39 40 if( "shutdown".equals(input) ) { 41 shutdown = true; 42 } else { 43 int baseNumber = 0; 44 try { 45 baseNumber = Integer.parseInt(input); 46 } catch (Exception e) { 47 e.printStackTrace(); 48 } 49 50 if( !executors.isEmpty() ) { 51 AsyncExecutor ae = executors.pop(); 52 ae.assignBaseNumber(baseNumber); 53 } else { 54 logger.info("no more executor, waiting for next time"); 55 } 56 57 } 58 59 } 60 } 61 62 }
1 public class AsyncExecutor implements Runnable{ 2 3 private static Logger logger = Logger.getLogger(AsyncExecutor.class); 4 5 private long result = 0l; 6 private boolean available = false; 7 8 private long baseNumber; 9 private int threadIndex; 10 11 private boolean shutdown = false; 12 private AsyncCaller asyncCaller; 13 14 public AsyncExecutor(int threadIndex, AsyncCaller ac) { 15 this.threadIndex = threadIndex; 16 this.asyncCaller = ac; 17 } 18 19 public void threadStart() { 20 Thread t = new Thread(this, String.valueOf(threadIndex)); 21 t.start(); 22 } 23 24 /* 25 * 所有本类的实例在新建时,就通过调用Thread.start()方法 26 * 来开始执行操作,在进入方法awaitBaseNumber()时,每个 27 * 线程都在等待一个状态位(available)的变化,状态位的变化 28 * 表明处理器线程等待的数据已经准备好,接下来,这个线程可以 29 * 来处理真正的业务逻辑,待处理完毕后,把当前的线程放回线程池, 30 * 以便重复利用 31 */ 32 @Override 33 public void run() { 34 while( !shutdown ) { 35 // 等待期望的数据,此时线程在此处一直处于停滞的状态(Blocking), 36 // 直到处理器线程(AsyncCaller)准备好数据,然后通过改变状态位,available 37 // 让本线程重新恢复运行的状态 38 long baseNumber = awaitBaseNumber(); 39 // 处理真正的逻辑 40 process(baseNumber); 41 //把当前线程放回线程池,以便重复利用 42 this.asyncCaller.recycle(this); 43 } 44 45 } 46 47 /* 48 * 正是assignBaseNumber()方法和awaitBaseNumber()方法相互影响, 49 * 相互制约,才使得多线程异步执行成为可能 50 */ 51 public synchronized void assignBaseNumber( long baseNumber ) { 52 53 while( available ) { 54 try { 55 wait(); 56 } catch (InterruptedException e) { 57 e.printStackTrace(); 58 } 59 } 60 61 this.baseNumber = baseNumber; 62 available = true; 63 notifyAll(); 64 65 } 66 67 public synchronized long awaitBaseNumber() { 68 69 while( !available ) { 70 try { 71 wait(); 72 } catch (InterruptedException e) { 73 e.printStackTrace(); 74 } 75 } 76 77 long baseNumber = this.baseNumber; 78 79 available = false; 80 notifyAll(); 81 return baseNumber; 82 } 83 84 private void process(long baseNumber) { 85 logger.info("Thread:" + Thread.currentThread().getName() + " base number:" + baseNumber); 86 timeCostingCalculation(baseNumber); 87 } 88 89 private void timeCostingCalculation(long baseNumber) { 90 91 result = baseNumber; 92 for( int i = 0; i < 10_000; i++ ) { 93 result = result + 1; 94 try { 95 Thread.sleep(1); 96 } catch (InterruptedException e) { 97 e.printStackTrace(); 98 } 99 } 100 logger.info("Thread:" + Thread.currentThread().getName() + " result:" + result); 101 } 102 103 104 }
参考书籍
1,深入剖析Tomcat
标签:
原文地址:http://www.cnblogs.com/shengyang/p/4657925.html