文章目录
Activiti7源码分析
整体架构
Activiti采用了一个分层架构完成自底向上的包装。架构图如下
大致包括:
- 核心接口层,被PVM接口定义。PVM会在后面的章节中详细讲述。
- 核心实现层,基于PVM的思想和接口,定义了一些关键实体包含ActivityImpl(该类抽象了节点实现),FlowElementBehavior实现(该类抽象了节点指令动作),ExecutionImpl(流程执行实体类)
- 命令层,Activiti在编码模式上直接限定整体风格为命令模式。也就是将业务逻辑封装为一个个的Command接口实现类。这样新增一个业务功能时只需要新增一个Command实现即可。这里需要特别提到的是,命令本身需要运行在命令上下文中,也就是CommandContext类对象。
- 命令拦截层,采用责任链模式,通过责任链模式的拦截器层,为命令的执行创造条件。诸如开启事务,创建CommandContext上下文,记录日志等
- 业务接口层,面向业务,提供了各种接口。这部分的接口就不再面向框架开发者了,而是面向框架的使用者。
- 部署层,严格来说,这个与上面说到的并不是一个完整的分层体系。但是为了突出重要性,单独拿出来说。流程运转的前提是流程定义。而流程定义解析就是一切的开始。从领域语言解析为Java的POJO对象依靠的就是部署层。后文还会细说这个环节。
- 流程引擎,所有接口的总入口。上面提到的业务接口层,部署层都可以从流程引擎类中得到。因此这里的流程引擎接口其实类似门面模式,只作为提供入口。
命令模式
Activit整体上采用命令模式进行代码功能解耦。将流程引擎的大部分涉及到客户端的需求让外部以具体命令实现类的方式实现。
完成这个编码模式,有几个重点类需要关注
-
Command
命令接口,所有的具体命令都需要实现该类,最终业务就是执行该类的execute方法。 -
CommandContext
命令上下文,为具体命令的执行提供上下文支撑。该上下文的生成是依靠命令拦截器中的上下文拦截器org.activiti.engine.impl.interceptor.CommandContextInterceptor
来生成的。该拦截器会判断是复用当前的上下文还是生成新的上下文。
引擎内的大部分功能都是通过单独的命令完成。
责任链模式
Activiti的命令模式还需要搭配其对应的责任链来完成。具体来说,Activiti中存在一个命令拦截器链条,该命令拦截器链条由几大块的拦截器实现组成,如下
其中重要的默认拦截器有2个:
- 事务拦截器,主要职责是使得后续命令运行在事务环境下。
- CommandContext拦截器,主要职责是在有必要的时候创建CommandContext对象,并在使用完成后关闭该上下文。
常用拦截器:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TC4Db9aa-1615645423630)(D:\codes\java-learning-service\笔记\第三方开源框架\activiti7\images\image-20210104154046405.png)]
责任链模式构建源码分析:
ProcessEngineConfigurationImpl:
public void initCommandExecutors() {
initDefaultCommandConfig();
initSchemaCommandConfig();
initCommandInvoker();
initCommandInterceptors();//初始化责任链(Interceptor连起来)
initCommandExecutor();
}
public void initCommandInvoker() {
if (commandInvoker == null) {
if (enableVerboseExecutionTreeLogging) {
commandInvoker = new DebugCommandInvoker();
} else {
commandInvoker = new CommandInvoker();
}
}
}
//拦截器顺序:customPreCommandInterceptors->defaultCommandInterceptors->customPostCommandInterceptors->commandInvoker(最后执行命令)
public void initCommandInterceptors() {
if (commandInterceptors == null) {
commandInterceptors = new ArrayList<CommandInterceptor>();
if (customPreCommandInterceptors != null) {
commandInterceptors.addAll(customPreCommandInterceptors);
}
commandInterceptors.addAll(getDefaultCommandInterceptors());
if (customPostCommandInterceptors != null) {
commandInterceptors.addAll(customPostCommandInterceptors);
}
commandInterceptors.add(commandInvoker);
}
}
//获取系统默认的拦截器
public Collection<? extends CommandInterceptor> getDefaultCommandInterceptors() {
List<CommandInterceptor> interceptors = new ArrayList<CommandInterceptor>();
interceptors.add(new LogInterceptor());//1.日志拦截器
CommandInterceptor transactionInterceptor = createTransactionInterceptor();
if (transactionInterceptor != null) {
interceptors.add(transactionInterceptor);//2.事务拦截器
}
if (commandContextFactory != null) {
interceptors.add(new CommandContextInterceptor(commandContextFactory, this));//3.命令上下文拦截器
}
if (transactionContextFactory != null) {
interceptors.add(new TransactionContextInterceptor(transactionContextFactory));
}
return interceptors;
}
事务拦截器
事务拦截器是否提供取决于org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl
的子类对方法createTransactionInterceptor
的实现。独立使用时的org.activiti.engine.impl.cfg.StandaloneProcessEngineConfiguration
该方法返回为空。也就是不提供事务拦截器。此时,命令的运行就无法通过事务拦截器来提供事务环境了。
命令上下文拦截器
实现类:org.activiti.engine.impl.interceptor.CommandContextInterceptor。
该拦截器的功能非常重要,可以说是Activiti操作的核心之一。其作用是在后续拦截器执行前检查当前上下文环境,如果不存在CommandContext对象,则创建一个;在后续拦截器执行后,将CommandContext对象close。CommandContext包含了本次操作中涉及到所有的数据对象。
public class CommandContextInterceptor extends AbstractCommandInterceptor {
private static final Logger log = LoggerFactory.getLogger(CommandContextInterceptor.class);
protected CommandContextFactory commandContextFactory;
protected ProcessEngineConfigurationImpl processEngineConfiguration;
public CommandContextInterceptor() {
}
public CommandContextInterceptor(CommandContextFactory commandContextFactory, ProcessEngineConfigurationImpl processEngineConfiguration) {
this.commandContextFactory = commandContextFactory;
this.processEngineConfiguration = processEngineConfiguration;
}
public <T> T execute(CommandConfig config, Command<T> command) {
CommandContext context = Context.getCommandContext();
boolean contextReused = false;
// We need to check the exception, because the transaction can be in a
// rollback state, and some other command is being fired to compensate (eg. decrementing job retries)
if (!config.isContextReusePossible() || context == null || context.getException() != null) {
//如果不存在CommandContext对象,则创建一个
context = commandContextFactory.createCommandContext(command);
} else {
log.debug("Valid context found. Reusing it for the current command '{}'", command.getClass().getCanonicalName());
contextReused = true;
context.setReused(true);
}
try {
// Push on stack
Context.setCommandContext(context);
Context.setProcessEngineConfiguration(processEngineConfiguration);
if (processEngineConfiguration.getActiviti5CompatibilityHandler() != null) {
Context.setActiviti5CompatibilityHandler(processEngineConfiguration.getActiviti5CompatibilityHandler());
}
return next.execute(config, command);
} catch (Throwable e) {
context.exception(e);
} finally {
try {
if (!contextReused) {
context.close();
}
} finally {
// Pop from stack
Context.removeCommandContext();
Context.removeProcessEngineConfiguration();
Context.removeBpmnOverrideContext();
Context.removeActiviti5CompatibilityHandler();
}
}
return null;
}
public CommandContextFactory getCommandContextFactory() {
return commandContextFactory;
}
public void setCommandContextFactory(CommandContextFactory commandContextFactory) {
this.commandContextFactory = commandContextFactory;
}
public ProcessEngineConfigurationImpl getProcessEngineConfiguration() {
return processEngineConfiguration;
}
public void setProcessEngineContext(ProcessEngineConfigurationImpl processEngineContext) {
this.processEngineConfiguration = processEngineContext;
}
}
流程定义解析
Activiti遵循BPMN2.0规范,因此框架中少不了对BPMN2.0规范的定义文件(XML形式)的解析类。Activiti采用的STAX的拉模型进行XML解析。这里先不分析其具体的解析类的内在联系,而是概念性的阐述下Activiti对解析的概念分层。
首先通过类org.activiti.bpmn.converter.BpmnXMLConverter
进行XML解析,解析为org.activiti.bpmn.model
包下面的与各个XML元素定义对应的POJO类。此时这些POJO类仅仅只是XML文件的一个Java表达。
在通过类org.activiti.engine.impl.bpmn.parser.BpmnParser
聚合不同的解析类,将上面步骤解析出来的POJO类进一步解析为可以在框架中利用的org.activiti.engine.impl.pvm.process
包下面的类。典型的代表就是ActivityImpl类。
三者之间的关系简单用图表达就是
Activiti之PVM执行树
核心理念
任何框架都是核心理念上发展细化而来。Activiti的核心理念就是流程虚拟机(Process Virtual Machine,以下简称PVM)。PVM试图提供一组API,通过API本身来描述工作流方面的各种可能性。没有了具体实现,也使得PVM本身可以较好的适应各种不同的工作流领域语言,而Activiti本身也是在PVM上的一种实现。
PVM对流程定义期的描述
首先来看下流程定义本身。在工作流中,流程定义可以图形化的表达为一组节点和连接构成的集合。比如下图
即使没有任何知识也能大概明白这张图表达的是一个流程以及执行顺序的意图。流程定义的表达方式不限,可以使用图形的方式表达,可以使用领域语言,也可以传统的XML(比如Activiti用的就是BPMN2.0 Schema下的XML)。特别的,当前已经有了标准化的BPMN2.0规范。
PVM将流程定义描述为流程元素的集合。再将流程元素细分为2个子类:流程节点和连线。
- 流程节点是某一种动作表达的抽象描述。节点本身是可以嵌套的,也就是节点可以拥有子节点。
- 连线表达是不同节点之间的转移关系。一个连线只能有一个源头节点和一个目标节点。而节点本身可以有任意多的进入连线和外出连线。
从类图的角度也能很好的看出这种关系,流程节点PvmActivity和连线PvmTransition都是流程元素PvmProcessElement。
从类图可以看到PvmActivity继承于PvmScope。这种继承关系表明流程节点本身有其归于的作用域(PvmScope),节点本身也可能是另外一些节点的作用域,这也符合节点可能拥有子节点的原则。关于作用域本身,后文还会再次详细讲解,这里先按下不表
PVM对流程运行期的描述
通过流程节点和连线,PVM完成了对流程定义的表达。流程定义是一个流程的静态表达,流程执行则是依照流程定义启动的一个运行期表达,每一个流程执行都具备自己唯一的生命周期。流程执行需要具备以下要素:
-
流程节点的具体执行动作。
Activiti提供了接口
org.activiti.engine.impl.pvm.delegate.ActivityBehavior
。该接口内部仅有一个execute方法。该接口的实现即为不同PvmActivity节点提供了具体动作。ActivityBehavior有丰富的不同实现,对应了流程中丰富的不同功能的节点。每一个PvmActivity对象都会持有一个ActivityBehavior对象。 -
流程执行当前处于哪一个流程节点。
Activiti提供了接口
org.activiti.engine.impl.pvm.PvmExecution
。该接口有一个方法PvmActivity getActivity()
。用以返回当前流程执行所处的流程节点。
PVM综述
从上面对PVM定义期和运行期的解释可以看出,整个概念体系并不复杂。涉及到的类也不多。正是因为PVM只对工作流中最基础的部分做了抽象和接口定义,使得PVM的实现上有了很多的可能性。
然而也正是由于定义的简单性,实际上这套PVM在转化为实际实现的时候需要额外附加很多的特性才能真正完成框架需求。
ActivitiImpl与作用域
在解析完成后,一个流程定义中的所有节点都会被解析为ActivityImpl对象。ActivityImpl对象本身可以持有事件订阅(根据BPMN2.0规范,目前有定时,消息,信号三种事件订阅类型)。因为ActivityImpl本身可以嵌套并且可以持有订阅,因此引入作用域概念(Scope)。
一个ActivityImpl在以下两种情况下会被定义为作用域ActivityImpl。
- 该ActivityImpl是可变范围,则它是一个作用域。可变范围可以理解为该节点的内容定义是可变的。比如流程定义、子流程,其内部内容是可变的。根据BPMN定义,可变范围有:流程定义,子流程,多实例,调用活动。
- 该ActivityImpl定义了一个上下文用于接收事件。比如:具备边界事件的ActivityImpl,具备事件子流程的ActivityImpl,事件驱动网关,中间事件捕获ActivityImpl。
作用域是一个很重要的概念,情况1中作用域定义的是复杂节点的生命周期,情况2中作用域定义的是事件的捕获范围。
发起流程实例源码
核心步骤
-
RuntimeServiceImpl#startProcessInstanceByKey
-
cmd.StartProcessInstanceCmd.execute()[日志,上下文,事务,命令模式调用者(execute cmd)]
-
#1.planOperation(new ContinueProcessOperation(commandContext, execution))
-
ContinueProcessOperation#continueThroughFlowNode(currentFlowElement:startEvent)
NoneStartEventActivityBehavior.execute(execution)
#2.planOperation(new TakeOutgoingSequenceFlowsOperation(commandContext, execution, evaluateConditions))
-
TakeOutgoingSequenceFlowsOperation (currentFlowElement:startEvent)
#leaveFlowNode
#findOutgoingFlows
#3.planOperation(new ContinueProcessOperation(commandContext, execution))//循环#1操作
-
ContinueProcessOperation#continueThroughFlowNode(currentFlowElement:userTask)
UserStartEventActivityBehavior.execute(execution)//重复步骤4
service API入口源码:
org.activiti.engine.RuntimeService#startProcessInstanceByKey()
>org.activiti.engine.impl.cfg.CommandExecutorImpl#execute(command)
多个对象责任链依次执行:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UrgFNzHT-1615645423635)(\images\image-20210105142628489.png)]
CommandInvoker命令模式执行者源码:
public <T> T execute(final CommandConfig config, final Command<T> command) {
final CommandContext commandContext = Context.getCommandContext();
// Execute the command.
// 将任务加入List<Runnable> operations中缓存,
commandContext.getAgenda().planOperation(new Runnable() {
@Override
public void run() {
commandContext.setResult(command.execute(commandContext));
}
});
// 循环执行任务==执行cmd中的execute方法
executeOperations(commandContext);
// At the end, call the execution tree change listeners.
// TODO: optimization: only do this when the tree has actually changed (ie check dbSqlSession).
if (commandContext.hasInvolvedExecutions()) {
Context.getAgenda().planExecuteInactiveBehaviorsOperation();
executeOperations(commandContext);
}
return (T) commandContext.getResult();
}
@Override
public void planOperation(Runnable operation) {
operations.add(operation);
if (operation instanceof AbstractOperation) {
ExecutionEntity execution = ((AbstractOperation) operation).getExecution();
if (execution != null) {
commandContext.addInvolvedExecution(execution);
}
}
logger.debug("Operation {} added to agenda", operation.getClass());
}
protected void executeOperations(final CommandContext commandContext) {
while (!commandContext.getAgenda().isEmpty()) {
Runnable runnable = commandContext.getAgenda().getNextOperation();
executeOperation(runnable);
}
}
public void executeOperation(Runnable runnable) {
if (runnable instanceof AbstractOperation) {
AbstractOperation operation = (AbstractOperation) runnable;
// Execute the operation if the operation has no execution (i.e. it's an operation not working on a process instance)
// or the operation has an execution and it is not ended
if (operation.getExecution() == null || !operation.getExecution().isEnded()) {
if (logger.isDebugEnabled()) {
logger.debug("Executing operation {} ", operation.getClass());
}
runnable.run();//执行command.execute(commandContext)
}
} else {
runnable.run();
}
}
执行StartProcessInstanceCmd的execute方法
public ProcessInstance execute(CommandContext commandContext) {
DeploymentManager deploymentCache = commandContext.getProcessEngineConfiguration().getDeploymentManager();
// Find the process definition
ProcessDefinition processDefinition = null;
if (processDefinitionId != null) {
processDefinition = deploymentCache.findDeployedProcessDefinitionById(processDefinitionId);
if (processDefinition == null) {
throw new ActivitiObjectNotFoundException("No process definition found for id = '" + processDefinitionId + "'", ProcessDefinition.class);
}
} else if (processDefinitionKey != null && (tenantId == null || ProcessEngineConfiguration.NO_TENANT_ID.equals(tenantId))) {
processDefinition = deploymentCache.findDeployedLatestProcessDefinitionByKey(processDefinitionKey);
if (processDefinition == null) {
throw new ActivitiObjectNotFoundException("No process definition found for key '" + processDefinitionKey + "'", ProcessDefinition.class);
}
} else if (processDefinitionKey != null && tenantId != null && !ProcessEngineConfiguration.NO_TENANT_ID.equals(tenantId)) {
processDefinition = deploymentCache.findDeployedLatestProcessDefinitionByKeyAndTenantId(processDefinitionKey, tenantId);
if (processDefinition == null) {
throw new ActivitiObjectNotFoundException("No process definition found for key '" + processDefinitionKey + "' for tenant identifier " + tenantId, ProcessDefinition.class);
}
} else {
throw new ActivitiIllegalArgumentException("processDefinitionKey and processDefinitionId are null");
}
processInstanceHelper = commandContext.getProcessEngineConfiguration().getProcessInstanceHelper();
//核心方法:创建和发起流程实例
ProcessInstance processInstance = createAndStartProcessInstance(processDefinition, businessKey, processInstanceName, variables, transientVariables);
return processInstance;
}
//中间方法太多,直接跳转到核心逻辑
public ProcessInstance createAndStartProcessInstanceWithInitialFlowElement(ProcessDefinition processDefinition,
String businessKey, String processInstanceName, FlowElement initialFlowElement,
Process process, Map<String, Object> variables, Map<String, Object> transientVariables, boolean startProcessInstance) {
CommandContext commandContext = Context.getCommandContext();
// Create the process instance
String initiatorVariableName = null;
if (initialFlowElement instanceof StartEvent) {
initiatorVariableName = ((StartEvent) initialFlowElement).getInitiator();
}
//操作数据库重点****
ExecutionEntity processInstance = commandContext.getExecutionEntityManager()
.createProcessInstanceExecution(processDefinition, businessKey, processDefinition.getTenantId(), initiatorVariableName);
commandContext.getHistoryManager().recordProcessInstanceStart(processInstance, initialFlowElement);
processInstance.setVariables(processDataObjects(process.getDataObjects()));
// Set the variables passed into the start command
if (variables != null) {
for (String varName : variables.keySet()) {
processInstance.setVariable(varName, variables.get(varName));
}
}
if (transientVariables != null) {
for (String varName : transientVariables.keySet()) {
processInstance.setTransientVariable(varName, transientVariables.get(varName));
}
}
// Set processInstance name
if (processInstanceName != null) {
processInstance.setName(processInstanceName);
commandContext.getHistoryManager().recordProcessInstanceNameChange(processInstance.getId(), processInstanceName);
}
// Fire events
if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher()
.dispatchEvent(ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED, processInstance, variables, false));
}
// Create the first execution that will visit all the process definition elements
ExecutionEntity execution = commandContext.getExecutionEntityManager().createChildExecution(processInstance);
//关键步骤,execution执行实例当前活动元素:startEvent
execution.setCurrentFlowElement(initialFlowElement);
if (startProcessInstance) {
startProcessInstance(processInstance, commandContext, variables);
}
return processInstance;
}
//****
org.activiti.engine.impl.persistence.entity.ExecutionEntityManagerImpl.java
@Override
public ExecutionEntity createProcessInstanceExecution(ProcessDefinition processDefinition, String businessKey, String tenantId, String initiatorVariableName) {
ExecutionEntity processInstanceExecution = executionDataManager.create();
if (isExecutionRelatedEntityCountEnabledGlobally()) {
((CountingExecutionEntity) processInstanceExecution).setCountEnabled(true);
}
processInstanceExecution.setProcessDefinitionId(processDefinition.getId());
processInstanceExecution.setProcessDefinitionKey(processDefinition.getKey());
processInstanceExecution.setProcessDefinitionName(processDefinition.getName());
processInstanceExecution.setProcessDefinitionVersion(processDefinition.getVersion());
processInstanceExecution.setBusinessKey(businessKey);
processInstanceExecution.setScope(true); // process instance is always a scope for all child executions
// Inherit tenant id (if any)
if (tenantId != null) {
processInstanceExecution.setTenantId(tenantId);
}
String authenticatedUserId = Authentication.getAuthenticatedUserId();
processInstanceExecution.setStartTime(Context.getProcessEngineConfiguration().getClock().getCurrentTime());
processInstanceExecution.setStartUserId(authenticatedUserId);
// Store in database,往数据库写数据
insert(processInstanceExecution, false);
if (initiatorVariableName != null) {
processInstanceExecution.setVariable(initiatorVariableName, authenticatedUserId);
}
// Need to be after insert, cause we need the id
processInstanceExecution.setProcessInstanceId(processInstanceExecution.getId());
processInstanceExecution.setRootProcessInstanceId(processInstanceExecution.getId());
if (authenticatedUserId != null) {
getIdentityLinkEntityManager().addIdentityLink(processInstanceExecution, authenticatedUserId, null, IdentityLinkType.STARTER);
}
// Fire events
if (getEventDispatcher().isEnabled()) {
getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createEntityEvent(ActivitiEventType.ENTITY_CREATED, processInstanceExecution));
}
return processInstanceExecution;
}
org.activiti.engine.impl.util.ProcessInstanceHelper#startProcessInstance关键步骤,开始事件流转到下一个节点的核心:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DGb2HslX-1615645423635)(images\image-20210105145809357.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fneEsB27-1615645423636)(images\image-20210105150430095.png)]
public void startProcessInstance(ExecutionEntity processInstance, CommandContext commandContext, Map<String, Object> variables) {
Process process = ProcessDefinitionUtil.getProcess(processInstance.getProcessDefinitionId());
// Event sub process handling
List<MessageEventSubscriptionEntity> messageEventSubscriptions = new LinkedList<>();
for (FlowElement flowElement : process.getFlowElements()) {
if (flowElement instanceof EventSubProcess) {
EventSubProcess eventSubProcess = (EventSubProcess) flowElement;
for (FlowElement subElement : eventSubProcess.getFlowElements()) {
if (subElement instanceof StartEvent) {
StartEvent startEvent = (StartEvent) subElement;
if (CollectionUtil.isNotEmpty(startEvent.getEventDefinitions())) {
EventDefinition eventDefinition = startEvent.getEventDefinitions().get(0);
if (eventDefinition instanceof MessageEventDefinition) {
MessageEventDefinition messageEventDefinition = (MessageEventDefinition) eventDefinition;
BpmnModel bpmnModel = ProcessDefinitionUtil.getBpmnModel(processInstance.getProcessDefinitionId());
if (bpmnModel.containsMessageId(messageEventDefinition.getMessageRef())) {
messageEventDefinition.setMessageRef(bpmnModel.getMessage(messageEventDefinition.getMessageRef()).getName());
}
ExecutionEntity messageExecution = commandContext.getExecutionEntityManager().createChildExecution(processInstance);
messageExecution.setCurrentFlowElement(startEvent);
messageExecution.setEventScope(true);
messageEventSubscriptions
.add(commandContext.getEventSubscriptionEntityManager().insertMessageEvent(messageEventDefinition.getMessageRef(), messageExecution));
}
}
}
}
}
}
ExecutionEntity execution = processInstance.getExecutions().get(0); // There will always be one child execution created
//重点:加入执行队列 # 1.1
commandContext.getAgenda().planContinueProcessOperation(execution);
if (Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
ActivitiEventDispatcher eventDispatcher = Context.getProcessEngineConfiguration().getEventDispatcher();
//dispatchEvent PROCESS_STARTED
eventDispatcher.dispatchEvent(ActivitiEventBuilder.createProcessStartedEvent(execution, variables, false));
for (MessageEventSubscriptionEntity messageEventSubscription : messageEventSubscriptions) {
commandContext.getProcessEngineConfiguration().getEventDispatcher()
.dispatchEvent(ActivitiEventBuilder.createMessageEvent(ActivitiEventType.ACTIVITY_MESSAGE_WAITING, messageEventSubscription.getActivityId(),
messageEventSubscription.getEventName(), null, messageEventSubscription.getExecution().getId(),
messageEventSubscription.getProcessInstanceId(), messageEventSubscription.getProcessDefinitionId()));
}
}
}
// 1.1 计划执行操作ContinueProcessOperation
@Override
public void planOperation(Runnable operation) {
operations.add(operation);
if (operation instanceof AbstractOperation) {
ExecutionEntity execution = ((AbstractOperation) operation).getExecution();
if (execution != null) {
commandContext.addInvolvedExecution(execution);
}
}
logger.debug("Operation {} added to agenda", operation.getClass());
}
// 1.2 死循环队列中有数据就执行
protected void executeOperations(final CommandContext commandContext) {
while (!commandContext.getAgenda().isEmpty()) {
Runnable runnable = commandContext.getAgenda().getNextOperation();
executeOperation(runnable);
}
}
//2.1 ContinueProcessOperation
@Override
public void run() {
FlowElement currentFlowElement = getCurrentFlowElement(execution);
if (currentFlowElement instanceof FlowNode) {
continueThroughFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
continueThroughSequenceFlow((SequenceFlow) currentFlowElement);
} else {
throw new ActivitiException("Programmatic error: no current flow element found or invalid type: " + currentFlowElement + ". Halting.");
}
}
//2.2 ***** continueThroughFlowNode startEvent->sequence
protected void continueThroughFlowNode(FlowNode flowNode) {
// Check if it's the initial flow element. If so, we must fire the execution listeners for the process too
if (flowNode.getIncomingFlows() != null
&& flowNode.getIncomingFlows().size() == 0
&& flowNode.getSubProcess() == null) {
executeProcessStartExecutionListeners();
}
// For a subprocess, a new child execution is created that will visit the steps of the subprocess
// The original execution that arrived here will wait until the subprocess is finished
// and will then be used to continue the process instance.
if (flowNode instanceof SubProcess) {
createChildExecutionForSubProcess((SubProcess) flowNode);
}
if (flowNode instanceof Activity && ((Activity) flowNode).hasMultiInstanceLoopCharacteristics()) {
// the multi instance execution will look at async
executeMultiInstanceSynchronous(flowNode);
} else if (forceSynchronousOperation || !flowNode.isAsynchronous()) {
executeSynchronous(flowNode);
} else {
executeAsynchronous(flowNode);
}
}
protected void executeSynchronous(FlowNode flowNode) {
commandContext.getHistoryManager().recordActivityStart(execution);
// Execution listener: event 'start'
if (CollectionUtil.isNotEmpty(flowNode.getExecutionListeners())) {
executeExecutionListeners(flowNode,
ExecutionListener.EVENTNAME_START);
}
// Execute any boundary events, sub process boundary events will be executed from the activity behavior
if (!inCompensation && flowNode instanceof Activity) { // Only activities can have boundary events
List<BoundaryEvent> boundaryEvents = ((Activity) flowNode).getBoundaryEvents();
if (CollectionUtil.isNotEmpty(boundaryEvents)) {
executeBoundaryEvents(boundaryEvents,
execution);
}
}
// Execute actual behavior
ActivityBehavior activityBehavior = (ActivityBehavior) flowNode.getBehavior();
if (activityBehavior != null) {
executeActivityBehavior(activityBehavior,
flowNode);
} else {
logger.debug("No activityBehavior on activity '{}' with execution {}",
flowNode.getId(),
execution.getId());
Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution,
true);
}
}
//3.1 executeActivityBehavior(元素Behavior)*****
protected void executeActivityBehavior(ActivityBehavior activityBehavior,
FlowNode flowNode) {
logger.debug("Executing activityBehavior {} on activity '{}' with execution {}",
activityBehavior.getClass(),
flowNode.getId(),
execution.getId());
if (Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createActivityEvent(ActivitiEventType.ACTIVITY_STARTED,
flowNode.getId(),
flowNode.getName(),
execution.getId(),
execution.getProcessInstanceId(),
execution.getProcessDefinitionId(),
flowNode));
}
try {
activityBehavior.execute(execution);
} catch (RuntimeException e) {
if (LogMDC.isMDCEnabled()) {
LogMDC.putMDCExecution(execution);
}
throw e;
}
}
//3.2执行org.activiti.engine.impl.bpmn.behavior.FlowNodeActivityBehavior
/**
* Default behaviour: just leave the activity with no extra functionality.
*/
public void execute(DelegateExecution execution) {
leave(execution);
}
protected void performOutgoingBehavior(ExecutionEntity execution,
boolean checkConditions,
boolean throwExceptionIfExecutionStuck) {
Context.getAgenda().planTakeOutgoingSequenceFlowsOperation(execution,
true);
}
@Override
public void planTakeOutgoingSequenceFlowsOperation(ExecutionEntity execution, boolean evaluateConditions) {
planOperation(new TakeOutgoingSequenceFlowsOperation(commandContext, execution, evaluateConditions));
}
//TakeOutgoingSequenceFlowsOperation
//离开ExecutionEntity当前所在的FlowElement}的操作,并让它遵循顺序流。
@Override
public void run() {
FlowElement currentFlowElement = getCurrentFlowElement(execution);
// Compensation check
if ((currentFlowElement instanceof Activity)
&& (((Activity) currentFlowElement)).isForCompensation()) {
cleanupCompensation();
return;
}
// When leaving the current activity, we need to delete any related execution (eg active boundary events)
cleanupExecutions(currentFlowElement);
if (currentFlowElement instanceof FlowNode) {
handleFlowNode((FlowNode) currentFlowElement);
} else if (currentFlowElement instanceof SequenceFlow) {
handleSequenceFlow();
}
}
protected void handleFlowNode(FlowNode flowNode) {
handleActivityEnd(flowNode);
if (flowNode.getParentContainer() != null
&& flowNode.getParentContainer() instanceof AdhocSubProcess) {
handleAdhocSubProcess(flowNode);
} else {
//startEvent
leaveFlowNode(flowNode);
}
}
protected void leaveFlowNode(FlowNode flowNode) {
logger.debug("Leaving flow node {} with id '{}' by following it's {} outgoing sequenceflow",
flowNode.getClass(),
flowNode.getId(),
flowNode.getOutgoingFlows().size());
// Get default sequence flow (if set)
String defaultSequenceFlowId = null;
if (flowNode instanceof Activity) {
defaultSequenceFlowId = ((Activity) flowNode).getDefaultFlow();
} else if (flowNode instanceof Gateway) {
defaultSequenceFlowId = ((Gateway) flowNode).getDefaultFlow();
}
// Determine which sequence flows can be used for leaving
List<SequenceFlow> outgoingSequenceFlows = new ArrayList<SequenceFlow>();
for (SequenceFlow sequenceFlow : flowNode.getOutgoingFlows()) {
String skipExpressionString = sequenceFlow.getSkipExpression();
if (!SkipExpressionUtil.isSkipExpressionEnabled(execution,
skipExpressionString)) {
if (!evaluateConditions
|| (evaluateConditions && ConditionUtil.hasTrueCondition(sequenceFlow,
execution) && (defaultSequenceFlowId == null || !defaultSequenceFlowId.equals(sequenceFlow.getId())))) {
outgoingSequenceFlows.add(sequenceFlow);
}
} else if (flowNode.getOutgoingFlows().size() == 1 || SkipExpressionUtil.shouldSkipFlowElement(commandContext,
execution,
skipExpressionString)) {
// The 'skip' for a sequence flow means that we skip the condition, not the sequence flow.
outgoingSequenceFlows.add(sequenceFlow);
}
}
// Check if there is a default sequence flow
if (outgoingSequenceFlows.size() == 0 && evaluateConditions) { // The elements that set this to false also have no support for default sequence flow
if (defaultSequenceFlowId != null) {
for (SequenceFlow sequenceFlow : flowNode.getOutgoingFlows()) {
if (defaultSequenceFlowId.equals(sequenceFlow.getId())) {
outgoingSequenceFlows.add(sequenceFlow);
break;
}
}
}
}
// No outgoing found. Ending the execution
if (outgoingSequenceFlows.size() == 0) {
if (flowNode.getOutgoingFlows() == null || flowNode.getOutgoingFlows().size() == 0) {
logger.debug("No outgoing sequence flow found for flow node '{}'.",
flowNode.getId());
Context.getAgenda().planEndExecutionOperation(execution);
} else {
throw new ActivitiException("No outgoing sequence flow of element '" + flowNode.getId() + "' could be selected for continuing the process");
}
} else {
// Leave, and reuse the incoming sequence flow, make executions for all the others (if applicable)
ExecutionEntityManager executionEntityManager = commandContext.getExecutionEntityManager();
List<ExecutionEntity> outgoingExecutions = new ArrayList<ExecutionEntity>(flowNode.getOutgoingFlows().size());
SequenceFlow sequenceFlow = outgoingSequenceFlows.get(0);
// Reuse existing one
execution.setCurrentFlowElement(sequenceFlow);
execution.setActive(true);
outgoingExecutions.add((ExecutionEntity) execution);
// Executions for all the other one
if (outgoingSequenceFlows.size() > 1) {
for (int i = 1; i < outgoingSequenceFlows.size(); i++) {
ExecutionEntity parent = execution.getParentId() != null ? execution.getParent() : execution;
ExecutionEntity outgoingExecutionEntity = commandContext.getExecutionEntityManager().createChildExecution(parent);
SequenceFlow outgoingSequenceFlow = outgoingSequenceFlows.get(i);
outgoingExecutionEntity.setCurrentFlowElement(outgoingSequenceFlow);
executionEntityManager.insert(outgoingExecutionEntity);
outgoingExecutions.add(outgoingExecutionEntity);
}
}
// Leave (only done when all executions have been made, since some queries depend on this)
for (ExecutionEntity outgoingExecution : outgoingExecutions) {
Context.getAgenda().planContinueProcessOperation(outgoingExecution);
}
}
}