码迷,mamicode.com
首页 > 其他好文 > 详细

任务执行和任务处理的异步执行

时间:2015-07-19 01:21:32      阅读:134      评论:0      收藏:0      [点我收藏+]

标签:

最近在看《深入剖析Tomcat》,在第四章有讲解Tomcat的默认连接器,和这个连接器有着密切关系的是一个处理器。一个连接器可以对应多个处理器。连接器的作用是从客户端的请求中提取出Socket,然后把Socket传递给处理器来处理用户的请求。需要注意的是连接器的关注点是在接收请求,分派给他下游的处理器去执行,在把Socket传递下去后,连接器在这次请求中的任务已经完成。这里可以看到,连接器和处理器他们是异步进行的,异步的处理为多线程的执行提供了可能。

在这里我要说的是,在看了Tomcat的源码实现后,有了点自己的想法,写出了两个相对类似的类,以自己以后参考以及分享给大家。

 

 1 public class AsyncCaller implements Runnable{
 2 
 3     private static Logger logger = Logger.getLogger(AsyncCaller.class);
 4     
 5     private Stack<AsyncExecutor> executors = new Stack<AsyncExecutor>();
 6     private boolean shutdown = false;
 7     
 8     private static int MAX_THREAD_NUMBER = 10;
 9     
10     public static void main(String[] args) {
11         AsyncCaller ac = new AsyncCaller();
12         ac.init();
13         new Thread(ac).start();
14     }
15 
16     public void init() {
17         for( int i = 0; i < MAX_THREAD_NUMBER; i++ ) {
18             AsyncExecutor ae = new AsyncExecutor(i, this);
19             ae.threadStart();
20             executors.add(ae);
21         }
22     }
23     
24     public void recycle(AsyncExecutor ae) {
25         this.executors.push(ae);
26     }
27     
28     @Override
29     public void run() {
30         while( !shutdown ) {
31             logger.info("Input:");
32             BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
33             String input = null;
34             try {
35                  input = br.readLine();
36             } catch (IOException e) {
37                 e.printStackTrace();
38             }
39             
40             if( "shutdown".equals(input) ) {
41                 shutdown = true;
42             } else {
43                 int baseNumber = 0;
44                 try {
45                     baseNumber = Integer.parseInt(input);
46                 } catch (Exception e) {
47                     e.printStackTrace();
48                 }
49                 
50                 if( !executors.isEmpty() ) {
51                     AsyncExecutor ae = executors.pop();
52                     ae.assignBaseNumber(baseNumber);
53                 } else {
54                     logger.info("no more executor, waiting for next time");
55                 }
56 
57             }
58 
59         }
60     }
61 
62 }

 

 

  1 public class AsyncExecutor implements Runnable{
  2 
  3     private static Logger logger = Logger.getLogger(AsyncExecutor.class);
  4     
  5     private long result = 0l;
  6     private boolean available = false;
  7     
  8     private long baseNumber;
  9     private int threadIndex;
 10     
 11     private boolean shutdown = false;
 12     private AsyncCaller asyncCaller;
 13     
 14     public AsyncExecutor(int threadIndex, AsyncCaller ac) {
 15         this.threadIndex = threadIndex;
 16         this.asyncCaller = ac;
 17     }
 18     
 19     public void threadStart() {
 20         Thread t = new Thread(this, String.valueOf(threadIndex));
 21         t.start();
 22     }
 23     
 24     /*
 25      * 所有本类的实例在新建时,就通过调用Thread.start()方法
 26      * 来开始执行操作,在进入方法awaitBaseNumber()时,每个
 27      * 线程都在等待一个状态位(available)的变化,状态位的变化
 28      * 表明处理器线程等待的数据已经准备好,接下来,这个线程可以
 29      * 来处理真正的业务逻辑,待处理完毕后,把当前的线程放回线程池,
 30      * 以便重复利用
 31      */
 32     @Override
 33     public void run() {
 34         while( !shutdown ) {
 35             // 等待期望的数据,此时线程在此处一直处于停滞的状态(Blocking),
 36             // 直到处理器线程(AsyncCaller)准备好数据,然后通过改变状态位,available
 37             // 让本线程重新恢复运行的状态
 38             long baseNumber = awaitBaseNumber();
 39             // 处理真正的逻辑
 40             process(baseNumber);
 41             //把当前线程放回线程池,以便重复利用
 42             this.asyncCaller.recycle(this);
 43         }
 44 
 45     }
 46     
 47     /*
 48      * 正是assignBaseNumber()方法和awaitBaseNumber()方法相互影响,
 49      * 相互制约,才使得多线程异步执行成为可能
 50      */
 51     public synchronized void assignBaseNumber( long baseNumber ) {
 52         
 53         while( available ) {
 54             try {
 55                 wait();
 56             } catch (InterruptedException e) {
 57                 e.printStackTrace();
 58             }
 59         }
 60         
 61         this.baseNumber = baseNumber;
 62         available = true;
 63         notifyAll();
 64         
 65     }
 66     
 67     public synchronized long awaitBaseNumber() {
 68         
 69         while( !available ) {
 70             try {
 71                 wait();
 72             } catch (InterruptedException e) {
 73                 e.printStackTrace();
 74             }
 75         }
 76 
 77         long baseNumber = this.baseNumber;
 78         
 79         available = false;
 80         notifyAll();
 81         return  baseNumber;
 82     }
 83     
 84     private void process(long baseNumber) {
 85         logger.info("Thread:" + Thread.currentThread().getName() + " base number:" + baseNumber);
 86         timeCostingCalculation(baseNumber);
 87     }
 88     
 89     private void timeCostingCalculation(long baseNumber) {
 90         
 91         result = baseNumber;
 92         for( int i = 0; i < 10_000; i++ ) {
 93             result = result + 1;
 94             try {
 95                 Thread.sleep(1);
 96             } catch (InterruptedException e) {
 97                 e.printStackTrace();
 98             }
 99         }
100         logger.info("Thread:" + Thread.currentThread().getName() + " result:" + result);
101     }
102 
103     
104 }

 

 

参考书籍

1,深入剖析Tomcat

任务执行和任务处理的异步执行

标签:

原文地址:http://www.cnblogs.com/shengyang/p/4657925.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!