之前博客Apache Camel源码研究之启动限于篇幅和论述重点的原因,对很多关键性问题只是进行了概述或者根本没有提及,本篇博客正是对其进行补充性完善。
1. 概述
在之前的博客Apache Camel源码研究之启动中我们介绍了Apache Camel初始化启动时的全局逻辑,并顺带部分介绍了Apache Camel执行运行时逻辑时其组件类CamelInternalProcessor
起到的关键性作用。本文接下来的部分将再次尝试剖析CamelInternalProcessor
类,意图给读者一个更高清晰的认识,降低理解的入门门槛,做到应用时的胸有成竹。
2. 解读
首先我们借用下上一篇博客 Apache Camel源码研究之Error Handler的测试用例代码。强烈建议结合本文下一小节的流程图进行对照理解。
CamelTestUtil.defaultPrepareTest2(new RouteBuilder() {
@Override
public void configure() throws Exception {
// 省略Error Handler配置部分
...
from("stream:in?promptMessage=Enter something:")//
.process(MessageDetailReportProcessor.me)//
// 我们自定义的Processor实现在Camel内部使用 DelegateSyncProcessor 进行统一包装, 避免直接接触,建立中间层,增加系统弹性。
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
final Integer camelRedeliveryCounter = exchange.getIn().getHeader(
Exchange.REDELIVERY_COUNTER, int.class);
Console.log("当前线程名称: " + Thread.currentThread().getName());
if(camelRedeliveryCounter < 2){
throw new RuntimeException("LQ" + camelRedeliveryCounter);
}
}
});
}
});
以上用例执行之后,我们将得到如下堆栈:
正如之前博客所言,逻辑处理链条从CamelInternalProcessor
开始(起始细节省略),其核心方法正是对接口AsyncProcessor
的实现:
// CamelInternalProcessor.process()
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
...
// 迭代回调配置的 CamelInternalProcessorAdvice 实例
final List<Object> states = new ArrayList<Object>(advices.size());
for (CamelInternalProcessorAdvice task : advices) {
try {
Object state = task.before(exchange);
states.add(state);
} catch (Throwable e) {
exchange.setException(e);
callback.done(true);
return true;
}
}
// 在本callback的回调中, 将最终反向回调上述成功执行的CamelInternalProcessorAdvice实例的after方法.
// create internal callback which will execute the advices in reverse order when done
callback = new InternalCallback(states, exchange, callback);
// UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
if (exchange.isTransacted() || synchronous != null) {
// must be synchronized for transacted exchanges
try {
// 回调CamelInternalProcessor实例构建使用配置的processor, 将Camel执行逻辑向前推动
// 在本例中, 该实例为Pipeline类型(看名字就能大致猜到其执行逻辑)
processor.process(exchange);
} catch (Throwable e) {
exchange.setException(e);
}
callback.done(true);
return true;
} else {
final UnitOfWork uow = exchange.getUnitOfWork();
// allow unit of work to wrap callback in case it need to do some special work
// for example the MDCUnitOfWork
AsyncCallback async = callback;
if (uow != null) {
async = uow.beforeProcess(processor, exchange, callback);
}
// 同上
boolean sync = processor.process(exchange, async);
// execute any after processor work (in current thread, not in the callback)
if (uow != null) {
uow.afterProcess(processor, exchange, callback, sync);
}
return sync;
}
}
挑选堆栈中的部分核心类总结如下:
-
CamelInternalProcessor
。Apache Camel逻辑处理链条从CamelInternalProcessor
开始。本例中将生成一个CamelInternalProcessor实例。 -
Pipeline
。CamelInternalProcessor
实例中的processor字段指向的实例实际类型正是Pipeline
。其对AsyncProcessor
接口的实现就是既定顺序挨个执行用户配置的processor。本例中用户配置的两个Processor正是被填入到本类中的processors
字段。本例中将生成一个DefaultChannel实例。 -
DefaultChannel
。用户配置的Processor,在Camel内部将最终被Wrap为该类型实例向外提供。被放入到Pipeline中的Processor实例真实类型正是它。本例中将生成两个DefaultChannel实例。
3. 结构图
以下便是本文测试用例中构造出的CamelInternalProcessor
实例其内部的主体结构,以及相应的执行链条逻辑(使用Processon进行绘制的)。
为佐证以上流程图,特提供以下截图:
- 本例中CamelInternalProcessor里配置的CamelInternalProcessorAdvice实例集合
- 本例中DefaultChannel里配置的CamelInternalProcessorAdvice实例集合
- 本例中Pipeline实例中填充进的两个DefaultChannel实例,其正对应本文测试用例中的自己配置的两个的Processor。