上篇Camel启动路由过程中讲到启动Consumer,调用了DefaultCamelContext.startService(service)方法,下面是方法源码:
private void startService(Service service) throws Exception {
if (service instanceof StartupListener) {
StartupListener listener = (StartupListener) service;
addStartupListener(listener);
}
service.start();
}protected void doStart() throws Exception {
//先调用父类的doStart()方法
super.doStart();
//省略...
//如果调用器为空,则创建一个
if (scheduler == null) {
scheduler = new DefaultScheduledPollConsumerScheduler();
}
scheduler.setCamelContext(getEndpoint().getCamelContext());
//设置scheduler的consumer成员变量
scheduler.onInit(this);
//设置scheduler的task成员变量
scheduler.scheduleTask(this);
//省略...
//启动scheduler,该方法调用了DefaultScheduledPollConsumerScheduler的doStart()方法
//为scheduler获取java.util.concurrent.ScheduledExecutorService,即线程池
//因为要轮询文件肯定是要开启新的线程的,所以需要一个线程池对象
ServiceHelper.startService(scheduler);
if (isStartScheduler()) {
//运行scheduler
startScheduler();
}
}@Override
public void startScheduler() {
// only schedule task if we have not already done that
if (future == null) {
if (isUseFixedDelay()) {
//省略...
future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit());
} else {
//省略...
future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit());
}
}
}该方法就是根据设置参数调用线程池不能的方法,注意,这里执行的任务(task)就是FileConsumer对象,所以会执行FileConsumer的run()方法,run()方法从ScheduledPollConsumer继承而来,里面调用doRun()方法,下面是源码:
private void doRun() {
boolean done = false;
int polledMessages = 0;
while (!done) {
try {
if (isPollAllowed()) {
polling = true;
try {
boolean begin = pollStrategy.begin(this, getEndpoint());
if (begin) {
retryCounter++;
//调用poll()方法进行轮询
polledMessages = poll();
LOG.trace("Polled {} messages", polledMessages);
//判断是否要发送空消息
if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) {
// send an "empty" exchange
processEmptyMessage();
}
}
}
}
}
}
}protected int poll() throws Exception {
//省略...
//收集需要处理的文件
List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
String name = endpoint.getConfiguration().getDirectory();
//该方法将文件轮询出来
boolean limitHit = !pollDirectory(name, files, 0);
//将轮询出来的文本绑定到Exchange
LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
for (GenericFile<T> file : files) {
Exchange exchange = endpoint.createExchange(file);
endpoint.configureExchange(exchange);
endpoint.configureMessage(file, exchange.getIn());
exchanges.add(exchange);
}
//是否需要排序
if (endpoint.getSortBy() != null) {
Collections.sort(exchanges, endpoint.getSortBy());
}
// use a queue for the exchanges
Deque<Exchange> q = exchanges;
//省略...
//传入创建的Exchange对象进行批量处理
int polledMessages = processBatch(CastUtils.cast(q));
postPollCheck();
return polledMessages;
}protected boolean processExchange(final Exchange exchange) {
//省略很多代码...
getAsyncProcessor().process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// noop
if (log.isTraceEnabled()) {
log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
}
}
});
}
return true;
}@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
////省略...
Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
if (exchange.isTransacted() || synchronous != null) {
//省略...
try {
processor.process(exchange);//同步处理
} catch (Throwable e) {
exchange.setException(e);
}
callback.done(true);
return true;
} else {
//省略...
boolean sync = processor.process(exchange, async);//异步处理
//省略...
}
}public boolean process(Exchange exchange, AsyncCallback callback) {
Iterator<Processor> processors = getProcessors().iterator();
Exchange nextExchange = exchange;
boolean first = true;
while (continueRouting(processors, nextExchange)) {
if (first) {
first = false;
} else {
// prepare for next run
nextExchange = createNextExchange(nextExchange);
}
// get the next processor
Processor processor = processors.next();
AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
boolean sync = process(exchange, nextExchange, callback, processors, async);
}
callback.done(true);
return true;
}@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
Endpoint endpoint = resolveEndpoint(routeContext);
return new SendProcessor(endpoint, getPattern());
}这就是创建一个SendProcessor对象,下面是SendProcessor对象的process方法:
public boolean process(Exchange exchange, final AsyncCallback callback) {
//省略...
final ExchangePattern existingPattern = exchange.getPattern();
// if we have a producer then use that as its optimized
if (producer != null) {
//省略...
return producer.process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
try {
// restore previous MEP
target.setPattern(existingPattern);
// emit event that the exchange was sent to the endpoint
long timeTaken = watch.stop();
EventHelper.notifyExchangeSent(target.getContext(), target, destination, timeTaken);
} finally {
callback.done(doneSync);
}
}
});
}
//省略...
}protected void processExchange(Exchange exchange, String target) throws Exception {
try {
//省略很多代码...
// write/upload the file
writeFile(exchange, tempTarget != null ? tempTarget : target);
//省略很多代码...
exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target);
} catch (Exception e) {
handleFailedWrite(exchange, e);
}
postWriteCheck();
}原文地址:http://blog.csdn.net/xtayfjpk/article/details/39120427