标签:camel esb async processor asyncprocessor
Camel支持一种更复杂的异步的处理模型,异步处理器实现一个继承自Processor接口的AsyncProcessor接口,使用异步Processor的优点:
a.异步Processor不会因等待阻塞调用而耗尽线程,这样在处理同样工作量的情况下,通过减少线程的数量可以增加系统的伸缩性
b.使用异步Processor,可以将路由分阶段处理,不同的线程池处理其相应的路由阶段,这就意味着路由可以并行处理。
缺点:实现异步的Processor要比同步的Processor复杂得多。
异步Processor与同步Processor的区别:
a.必须提供一个AsyncCallback对象,该对象在exchange处理完成后被通知
b.在异步Processor处理exchange的时候不能抛出任何异常,而应该将异常存储在exchange的Exception属性中
c.异步Processor必须知道它将以什么方式完成处理,异步或同步,如果process方法返回true,则是同步完成,如果process方法返回false,则是异步完成。
d.当处理器处理完exchange时,它必须调用callback.done(boolean sync)方法,sync参数必须与process方法的返回值一致。
对于一个路由来说,完全使用异步模式可以降低线程的使用量,这要求从Consumer开始就必须使用异步的处理API(即调用异步的
process方法),如果Consumer调用的是同步process()方法,那么消费者线程在处理Exchange时将被强制阻塞。
有一点必须注意的是当你调用了异步的API,这并不意味着处理过程就是异步的,这仅仅是为不捆绑在调用者线程提供了可能。
至于是否是进行异步处理依赖于Camel路由的配置.
以上是Camel官方对异步Processor的解释,下面是本人用于测试的一个例子:
public static void main(String[] args) throws Exception {
RouteBuilder builder = new RouteBuilder() {
@Override
public void configure() throws Exception {
RouteDefinition definition1 = this.from("file:H:/temp/in");
RouteDefinition definition2 = definition1.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread().getName());
System.out.println("process1");
}
}).process(new AsyncProcessor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("process");
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
System.out.println(Thread.currentThread().getName());
System.out.println("async process");
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.done(false);
return false;
}
}).process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(Thread.currentThread().getName());
System.out.println("process2");
}
});
definition2.to("file:H:/temp/out");
}
};
DefaultCamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(builder);
camelContext.start();
Object object = new Object();
synchronized (object) {
object.wait();
}
}
当我看到异步两个字时,直觉就是使用异步Processor时会启用新的线程进行处理,但在上面的例子中,三个线程名称是一样的,
并且在阻塞了10秒后process2才打印出来,这说明上面的三个processor是在同一个线程中执行的,这也是阻塞10秒的原因。
我个人认为是对Camel异步Processor的"异步"两字理解出现了偏差,这里的异步只为processor的processor方法,提供一个
回调函数,而不是另启线程。而且我们自己写Processor处理器对这个异步的使用也很有限,因为我们写的处理器是被调用者,AsyncCallback是由上层提供的,我们只是能调用其done方法通知上层本次处理完成,而我们更多的需求应该是自己去注册回调函数,并且我们能够控制这个回调函数的回调时机,而现在我们无法提供回调函数的注册。那我们不禁要问,这个AsyncCallback对象那到底是谁提供的呢?AsyncCallback对象的源头当然是在消费者类提供的,对上面的例子来说是在FileConsumer类中,如下是GenericFileConsumer的processExchange方法的一个片段(FileConsumer继承自GenericFileConsumer)
getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// noop
if (log.isTraceEnabled()) {
log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
}
}
});
这时创建的AsyncCallback对象就是源始的回调对象,当然在路由执行的后续过程中,该回调对象可以被包装,其中CamelInternalProcessor的process(Exchange exchange, AsyncCallback callback)方法就是一个例子:
callback = new InternalCallback(states, exchange, callback);
这里我们不禁又会问,既然CamelInternalProcessor能够对源始AsyncCallback对象进行包装加入自己的回调逻辑,为什么我们自己不行呢,其原来还是我们写的Processor是被调用者,是被包装者,具体过程可参看Camel路由启动过程。
如果非要添加自己的回调逻辑也不是不可能,就只能自己写消费者,自己写消费者就能控制源AsyncCallback对象,其后续只是对
源AsyncCallback对象的一个包装的过程,只要保证最外层的AsyncCallback对象被调用,那么源AsyncCallback对象也一定会被调用。所以在上例中,如果在第二个Processor中如果不执行callback.done(false);的话路由过程将永远不会结束,因为上层一直认为下层处理还未结束。当然如果我们不写异常Processor,路由过程还是会正常结束的,Camel内部会自行处理,但是如果我们写了异步Processor就一定要调用callback.done方法。
所以这么一通下来,并没有感受到官方提及的不阻塞调用、降价线程使用、路由分阶段处理等,个人的感觉就是多了一个回调方法,而且这个回调功能还很有限,当然这也有可能是自己什么地方理解错了,如若如此,尽请指正......
Camel之AsyncProcessor
标签:camel esb async processor asyncprocessor
原文地址:http://blog.csdn.net/xtayfjpk/article/details/40463609