上篇Camel启动路由过程中讲到启动Consumer,调用了DefaultCamelContext.startService(service)方法,下面是方法源码:
private void startService(Service service) throws Exception { if (service instanceof StartupListener) { StartupListener listener = (StartupListener) service; addStartupListener(listener); } service.start(); }
protected void doStart() throws Exception { //先调用父类的doStart()方法 super.doStart(); //省略... //如果调用器为空,则创建一个 if (scheduler == null) { scheduler = new DefaultScheduledPollConsumerScheduler(); } scheduler.setCamelContext(getEndpoint().getCamelContext()); //设置scheduler的consumer成员变量 scheduler.onInit(this); //设置scheduler的task成员变量 scheduler.scheduleTask(this); //省略... //启动scheduler,该方法调用了DefaultScheduledPollConsumerScheduler的doStart()方法 //为scheduler获取java.util.concurrent.ScheduledExecutorService,即线程池 //因为要轮询文件肯定是要开启新的线程的,所以需要一个线程池对象 ServiceHelper.startService(scheduler); if (isStartScheduler()) { //运行scheduler startScheduler(); } }
@Override public void startScheduler() { // only schedule task if we have not already done that if (future == null) { if (isUseFixedDelay()) { //省略... future = scheduledExecutorService.scheduleWithFixedDelay(task, getInitialDelay(), getDelay(), getTimeUnit()); } else { //省略... future = scheduledExecutorService.scheduleAtFixedRate(task, getInitialDelay(), getDelay(), getTimeUnit()); } } }该方法就是根据设置参数调用线程池不能的方法,注意,这里执行的任务(task)就是FileConsumer对象,所以会执行FileConsumer的run()方法,run()方法从ScheduledPollConsumer继承而来,里面调用doRun()方法,下面是源码:
private void doRun() { boolean done = false; int polledMessages = 0; while (!done) { try { if (isPollAllowed()) { polling = true; try { boolean begin = pollStrategy.begin(this, getEndpoint()); if (begin) { retryCounter++; //调用poll()方法进行轮询 polledMessages = poll(); LOG.trace("Polled {} messages", polledMessages); //判断是否要发送空消息 if (polledMessages == 0 && isSendEmptyMessageWhenIdle()) { // send an "empty" exchange processEmptyMessage(); } } } } } } }
protected int poll() throws Exception { //省略... //收集需要处理的文件 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); String name = endpoint.getConfiguration().getDirectory(); //该方法将文件轮询出来 boolean limitHit = !pollDirectory(name, files, 0); //将轮询出来的文本绑定到Exchange LinkedList<Exchange> exchanges = new LinkedList<Exchange>(); for (GenericFile<T> file : files) { Exchange exchange = endpoint.createExchange(file); endpoint.configureExchange(exchange); endpoint.configureMessage(file, exchange.getIn()); exchanges.add(exchange); } //是否需要排序 if (endpoint.getSortBy() != null) { Collections.sort(exchanges, endpoint.getSortBy()); } // use a queue for the exchanges Deque<Exchange> q = exchanges; //省略... //传入创建的Exchange对象进行批量处理 int polledMessages = processBatch(CastUtils.cast(q)); postPollCheck(); return polledMessages; }
protected boolean processExchange(final Exchange exchange) { //省略很多代码... getAsyncProcessor().process(exchange, new AsyncCallback() { public void done(boolean doneSync) { // noop if (log.isTraceEnabled()) { log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); } } }); } return true; }
@Override public boolean process(Exchange exchange, AsyncCallback callback) { ////省略... Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC); if (exchange.isTransacted() || synchronous != null) { //省略... try { processor.process(exchange);//同步处理 } catch (Throwable e) { exchange.setException(e); } callback.done(true); return true; } else { //省略... boolean sync = processor.process(exchange, async);//异步处理 //省略... } }
public boolean process(Exchange exchange, AsyncCallback callback) { Iterator<Processor> processors = getProcessors().iterator(); Exchange nextExchange = exchange; boolean first = true; while (continueRouting(processors, nextExchange)) { if (first) { first = false; } else { // prepare for next run nextExchange = createNextExchange(nextExchange); } // get the next processor Processor processor = processors.next(); AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); boolean sync = process(exchange, nextExchange, callback, processors, async); } callback.done(true); return true; }
@Override public Processor createProcessor(RouteContext routeContext) throws Exception { Endpoint endpoint = resolveEndpoint(routeContext); return new SendProcessor(endpoint, getPattern()); }这就是创建一个SendProcessor对象,下面是SendProcessor对象的process方法:
public boolean process(Exchange exchange, final AsyncCallback callback) { //省略... final ExchangePattern existingPattern = exchange.getPattern(); // if we have a producer then use that as its optimized if (producer != null) { //省略... return producer.process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { try { // restore previous MEP target.setPattern(existingPattern); // emit event that the exchange was sent to the endpoint long timeTaken = watch.stop(); EventHelper.notifyExchangeSent(target.getContext(), target, destination, timeTaken); } finally { callback.done(doneSync); } } }); } //省略... }
protected void processExchange(Exchange exchange, String target) throws Exception { try { //省略很多代码... // write/upload the file writeFile(exchange, tempTarget != null ? tempTarget : target); //省略很多代码... exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target); } catch (Exception e) { handleFailedWrite(exchange, e); } postWriteCheck(); }
原文地址:http://blog.csdn.net/xtayfjpk/article/details/39120427