Apache Camel源码研究之CamelInternalProcessor

之前博客Apache Camel源码研究之启动限于篇幅和论述重点的原因,对很多关键性问题只是进行了概述或者根本没有提及,本篇博客正是对其进行补充性完善。

1. 概述

在之前的博客Apache Camel源码研究之启动中我们介绍了Apache Camel初始化启动时的全局逻辑,并顺带部分介绍了Apache Camel执行运行时逻辑时其组件类CamelInternalProcessor起到的关键性作用。本文接下来的部分将再次尝试剖析CamelInternalProcessor类,意图给读者一个更高清晰的认识,降低理解的入门门槛,做到应用时的胸有成竹。

2. 解读

首先我们借用下上一篇博客 Apache Camel源码研究之Error Handler的测试用例代码。强烈建议结合本文下一小节的流程图进行对照理解

CamelTestUtil.defaultPrepareTest2(new RouteBuilder() {
	
	@Override
	public void configure() throws Exception {	
	    // 省略Error Handler配置部分
		...

		from("stream:in?promptMessage=Enter something:")//
				.process(MessageDetailReportProcessor.me)//
				// 我们自定义的Processor实现在Camel内部使用 DelegateSyncProcessor 进行统一包装, 避免直接接触,建立中间层,增加系统弹性。
				.process(new Processor() {
					@Override
					public void process(Exchange exchange) throws Exception {
						final Integer camelRedeliveryCounter = exchange.getIn().getHeader(
								Exchange.REDELIVERY_COUNTER, int.class);
						Console.log("当前线程名称: " + Thread.currentThread().getName());
						if(camelRedeliveryCounter < 2){
							throw new RuntimeException("LQ" + camelRedeliveryCounter);
						}
					}
				});
	}
});

以上用例执行之后,我们将得到如下堆栈:
Apache Camel源码研究之CamelInternalProcessor
正如之前博客所言,逻辑处理链条从CamelInternalProcessor开始(起始细节省略),其核心方法正是对接口AsyncProcessor的实现:

// CamelInternalProcessor.process()
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {

	...
	
	// 迭代回调配置的 CamelInternalProcessorAdvice 实例
    final List<Object> states = new ArrayList<Object>(advices.size());
    for (CamelInternalProcessorAdvice task : advices) {
        try {
            Object state = task.before(exchange);
            states.add(state);
        } catch (Throwable e) {
            exchange.setException(e);
            callback.done(true);
            return true;
        }
    }
	
	// 在本callback的回调中, 将最终反向回调上述成功执行的CamelInternalProcessorAdvice实例的after方法.
    // create internal callback which will execute the advices in reverse order when done
    callback = new InternalCallback(states, exchange, callback);

    // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
    Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
    if (exchange.isTransacted() || synchronous != null) {
        // must be synchronized for transacted exchanges

        try {
        	// 回调CamelInternalProcessor实例构建使用配置的processor, 将Camel执行逻辑向前推动
        	// 在本例中, 该实例为Pipeline类型(看名字就能大致猜到其执行逻辑)
            processor.process(exchange);
        } catch (Throwable e) {
            exchange.setException(e);
        }

        callback.done(true);
        return true;
    } else {
        final UnitOfWork uow = exchange.getUnitOfWork();

        // allow unit of work to wrap callback in case it need to do some special work
        // for example the MDCUnitOfWork
        AsyncCallback async = callback;
        if (uow != null) {
            async = uow.beforeProcess(processor, exchange, callback);
        }
		// 同上
        boolean sync = processor.process(exchange, async);

        // execute any after processor work (in current thread, not in the callback)
        if (uow != null) {
            uow.afterProcess(processor, exchange, callback, sync);
        }
		
        return sync;
    }
}

挑选堆栈中的部分核心类总结如下:

  1. CamelInternalProcessor。Apache Camel逻辑处理链条从CamelInternalProcessor开始。本例中将生成一个CamelInternalProcessor实例。
  2. PipelineCamelInternalProcessor实例中的processor字段指向的实例实际类型正是Pipeline。其对AsyncProcessor接口的实现就是既定顺序挨个执行用户配置的processor。本例中用户配置的两个Processor正是被填入到本类中的processors字段。本例中将生成一个DefaultChannel实例。
  3. DefaultChannel。用户配置的Processor,在Camel内部将最终被Wrap为该类型实例向外提供。被放入到Pipeline中的Processor实例真实类型正是它。本例中将生成两个DefaultChannel实例。

3. 结构图

以下便是本文测试用例中构造出的CamelInternalProcessor实例其内部的主体结构,以及相应的执行链条逻辑(使用Processon进行绘制的)。
Apache Camel源码研究之CamelInternalProcessor
为佐证以上流程图,特提供以下截图:

  1. 本例中CamelInternalProcessor里配置的CamelInternalProcessorAdvice实例集合
    Apache Camel源码研究之CamelInternalProcessor
  2. 本例中DefaultChannel里配置的CamelInternalProcessorAdvice实例集合
    Apache Camel源码研究之CamelInternalProcessor
  3. 本例中Pipeline实例中填充进的两个DefaultChannel实例,其正对应本文测试用例中的自己配置的两个的Processor。
    Apache Camel源码研究之CamelInternalProcessor
    Apache Camel源码研究之CamelInternalProcessor

4. Links

  1. Apache Camel源码研究之启动
  2. Apache Camel源码研究之InterceptStrategy
  3. Apache Camel源码研究之Error Handler
Apache Camel源码研究之CamelInternalProcessorApache Camel源码研究之CamelInternalProcessor 夫礼者 发布了137 篇原创文章 · 获赞 27 · 访问量 13万+ 私信 关注
上一篇:Camel 的 SEDA、Direct 和 VM 组件指南


下一篇:python3 str.translate method