参考:
https://yuque.antfin-inc.com/hanjun.hw/yvmten2/ido5af
引擎相关技术文档
流程引擎到平台的mq消息链路 | https://yuque.antfin-inc.com/hanjun.hw/yvmten2/ido5af |
---|---|
引擎核心介绍 | https://yuque.antfin-inc.com/alibpms/ssd5fr/bnn3qt |
加签的基础逻辑 | https://yuque.antfin-inc.com/alibpms/alipmc/append |
执行任务结束如何触发下一个任务序列 | https://yuque.antfin-inc.com/hanjun.hw/yvmten2/pi25gh |
如何解析取人规则 | https://yuque.antfin-inc.com/hanjun.hw/yvmten2/pi25gh |
什么时候激发下一个taskGroup | https://yuque.antfin-inc.com/hanjun.hw/hqdpat5/evis8a |
总览
- 孕育:这一步是生命的起源,对应"任务"就是:申办人因为办理业务而发起一个流程。这是任务产生的摇篮。
任务的使命就是为了完成业务;
- 生产:这是新生命产生的过程,对应"任务"就是:任务的创建和激活。
- 长大:婴儿长大了要办满月酒,办周岁,让亲人朋友分享喜悦。对应"任务"就是:通知审批人。
- 进入社会:小孩长大了,迟早要进入社会,面对不同人的看法和检阅。对应"任务"就是:给审批人审阅,确认填写资料是否正确,是否合规。
- 经受考验:人进入社会之后,会经受各种考验,如果能够成功顶住,就能获取飞跃般的成长。对应"任务"就是:合格合规,得到审批人的认可,不用被打回去重填。
- 过关斩将:人在考验中不断成长,日积月累,最终会进入更高一个阶段。 对应"任务"就是:审批人点击【同意】,任务进入下一个阶段。
- 死得其所:人一辈子修己,完成自己的使命,最后归于黄土,是一种圆满。 对应"任务"就是:从"RUNNING"状态进入"COMPLETED"状态,光荣完成自己的使命。
名词解释
审批任务
需要人进行审批的任务,以下简称"任务"
任务的状态
/**
* 非激活任务
*/
NEW,
/**
* 待办任务
*/
RUNNING,
/**
* 暂停任务
*/
PAUSED,
/**
* 取消/撤销
*/
CANCELED,
/**
* 完成
*/
COMPLETED,
/**
* 转交
*/
REDIRECTED
问题:
- 转交之后原任务的状态是什么?
TaskStatus.CANCELED
审批消息
MQ消息类型(此处指业务类型)说明
(只列举部分)
消息种类 | 说明 | 备注 |
---|---|---|
PROC_INST_START | 流程实例启动(发起流程实例) | |
TASK_BATCH_ACTIVATED | 任务激活 | |
TASK_COMPLETED | 任务完成 | |
TASK_CANCEL | 任务取消 | |
TASK_REDIRECTED | 任务转交 | |
PROCESS_INSTANCE_TERMINATE | 流程实例终止(比如申请人撤销) | |
PROC_INST_FINISH | 流程实例完成 | |
PROCESS_INSTANCE_PAUSED | 流程实例暂停 |
生命周期
01 生成流程实例
这是任务产生的摇篮,没有流程实例,就不可能有任务。
流程实例和任务是什么关系呢?一般一个流程实例有多个审批任务
什么时候会生成流程实例呢?
一般由业务活动触发
比如上图,在采购招投标 的过程中,有很多业务活动,标号(1)到(4)的业务活动都会触发流程实例的生成,即发起审批流。
流程实例生成之后,一定会生成任务吗?
不一定。比如下面的流程,只有一个"自动节点":
不会生成审批任务。
必须要有人工节点(下图标号1-3),才能生成审批任务:
(图01-03)
发起实例流程
(图01-04)
下面是创建流程实例发起的调度项,然后把调度项存储到数据库,再由调度引擎进行一步的调度:
ProcessInstanceStartResumptionExtension extension = new ProcessInstanceStartResumptionExtension(processInstance.getProcessId(), this.getGroupType());
extension.setParentProcessInstanceId(processInstance.getParentProcessInstanceId());
extension.setActivityId(activityId);
ResumptionDO resumption = new ResumptionDO(processInstance.getProcessInstanceId(), extension, processInstance.getAppKey());
this.flowEngine.asyncStartFlowByResumption(resumption);
asyncStartFlowByResumption 里面做了什么呢?
其实就是把调度项插入到数据库 **this**.resumptionRepositoryManager.insert(resumption, **false**);
在调度项处理程序中调用WorkflowRunnerImpl 发起流程
public void startWorkflow(String processInstanceId, long processId) {
WorkflowContext context = createWorkflowContext(processId, processInstanceId);
context.getWorkflow().start(context);
}
会从流程根节点找到第一个节点开始执行,首先找到的肯定是开始节点
(图01-05)
执行节点时,根据节点的类型,执行不同的Handler:
执行HumanActivityHandler(人工节点handle) 才能生成审批任务
发起流程实例之后,会发送一个变量更新的MQ消息:VariableChangeEvent
02 生成新任务
实例发起之后一定会生成任务吗?
只有人工节点才能生成审批任务
生成任务需要哪些准备工作?
- 在流程设计的时候,需要为每个人工节点 设置好业务规则(取人规则);
- 如果配置了条件节点,那么只有条件节点在运行态满足条件时才能走到审批节点;
- 发起实例,即填表(如下图)
(图01-06)
HumanActivityHandler(人工节点) 的执行逻辑
标号1:Activity activity = context.getFlowDefineModel().getActivityById(transition.getDestinationActivityId())
DefaultTransitionRouteHandler.externalExecute
标号2:ElementRuntimeManagerImpl.start
主要是检查前置拦截器,例如人工节点的前置条件
com.alibaba.flowengine.ext.workflow.impl.ElementRuntimeManagerImpl#start 中主要是校验前置规则,
实际调用handleBeforeInterceptors
其实这是开始节点,因为流程实例启动时,是从开始节点开始执行的.
其实目前activity.getActivitySetting().getBeforeInterceptors() 是预留的,还没有使用到
基本this.check(elementToStart, context)都会直接返回true
标号3:ElementRuntimeManagerImpl.enter
执行_节点进入后的行为_
标号4:AbstractActivityBehavior.enter
实际会调用BpmActivityBehavior.executeActivity
然后根据activity.getActivitySetting().getIdentifyKey()(HUMAN) 获取到对应的handler,
h获取到HumanActivityHandler
标号5:获取节点实例并执行
BpmsActivityEventBuilder.createActivityStartEvent 的任务:
发送节点实例启动ActivityInstanceStartEvent的 MQ消息
同时执行HumanActivityHandler.perform(activityContext)
标号6:HumanActivityHandler.perform();
执行节点实例有两种方式:
(1)同步
(2)异步:先作为调度项ResumptionDO,存储到数据库,然后由调度引擎来异步的执行
标号7:_异步调度_;
这里要注意的是:此时并没有真正干活,而是把真正要干活的任务存储到了调度项。
下面是异步的方式获取刚才存进去的调度项,然后真正执行:
(调度引擎是如何一步一步来执行调度任务(这里只执行人工节点实例)的)
说明:
标号1:这里调度引擎开始调度后,真正干活了,所以叫ItemWorker;
标号2:completeResumptionSuccessed 方法名称感觉不太好,其实这里是真正干活了,即通过调度项找到具体的handler 去执行;
其中会调用 resumptionHandler.handle(resumption)
标号3:com.alibaba.workflowengine.adapter.resumption.handler.HumanStartResumptionHandler#handle 会做两件事;
(a)通过调度项里面的扩展信息ExtensionContext 获取节点id
(b)查询流程版本schema获取该人工节点的配置信息
标号4:taskSequenceManager.initAndActivateRootTaskSequenceInternational
是初始化人工节点,并激活;
标号5:;
标号6:激活任务序列;
HumanStartResumptionHandler.handle
taskSequenceManager.initAndActivateRootTaskSequenceInternational
initAndActivateRootTaskSequenceInternational 具体做了什么呢?
03 激活任务
激活任务的时机是什么时候?
一般任务在初始化之后,就会被激活
任务为什么还需要一个激活的动作、多此一举?
比如我收到一个审批任务时,我可能不太肯定,所以前加签给另外一个伙伴,那么我的原来审批任务的状态就变为(PAUSED),等前加签执行完之后,我的审批任务恢复为"激活"状态(RUNNING)
激活任务之后还需要发MQ消息
收到任务激活消息后做了什么处理?
此处是针对消息类型 为TASK_BATCH_ACTIVATED的 消息进行处理:
说明:
标号1:MQ消费者 监听到消息之后进行处理;
标号2:根据消息种类,转交给具体的handler 进行业务处理;
标号3:根据消息种类,转交给具体的handler 进行业务处理;
标号4:调用 发送审批通知 的逻辑;
标号5:根据"任务激活"消息,获取新激活的任务信息,然后发送审批消息(支持公版钉钉和专有钉钉);
标号6:组装消息卡片,调用钉钉open API 发送工作通知(审批消息);
取消任务
04 审批环节
审批人收到任务之后,能做哪些事情?
如果审批人发现自己处理不了应该怎么办?
目前有两种方式:
- 转交:把审批任务转交给其他人处理,自己原先的任务就会结束掉
- 多人填表
审批任务一定需要人工审批吗?
不一定,(see https://bpms.alibaba-inc.com/processdesigner/newProcDesign?processId=15880317601)
如果我们在人工节点 设置了"自动完成规则",那么系统会自动执行(同意或拒绝)该任务
05 审批操作
当审批人点击【同意】、【已处理】或【通过】(按钮名称是可以修改的)时,任务发生了什么变化?
调用接口:executeTask,
06 任务状态变化
任务的状态机是怎样的?
一个任务的状态变化会影响其他任务的状态吗?
07 为什么有任务组
任务组存在的核心目的是 支持审批节点的完成策略,
上面的人工节点(审批节点) 配置了两个审批人,那么到底怎么样算审批通过呢?
是其中一个人审批通过 就算审批通过(规则1),还是所有审批人都通过才算审批通过(规则2)呢、
这就是完成策略。
(以上面为例)实际会生成两个任务,这两个任务输入同一个 任务组(TaskGroup)
比如采用规则1,那么只要有一个人审批通过,那么直接把所在任务组(TaskGroup) 置为“完成",另外一个任务就会自动取消掉。
任务模型
任务的taskType为
MIDDLE_IN_ONE_BY_ONE 或 MIDDLE_IN_ALL_AT_ONCE
- 同时分配(捞单)MIDDLE_IN_ALL_AT_ONCE
- 逐级审批MIDDLE_IN_ONE_BY_ONE
08 任务完成
审批人点击审批页下面的【同意】、【已处理】或【通过】,如果不出意外,那么任务的状态就变为已完成.
09 激活下一个任务
执行完当前任务之后,如何激活下一个任务呢?
下面从标号14的地方说起:
com.alibaba.flowengine.ext.route.handler.impl.MultipleTransitionRouteHandler#externalExecute
- 找到当前节点的出线,
- 执行线规则;
- 找到出线的startElement,
- 按照前面的步骤执行 startElement
z执行开始节点
流程实例启动之后,第一个执行的就是开始节点实例
也是通过 com.alibaba.flowengine.ext.workflow.impl.ElementRuntimeManagerImpl#start来执行的.
详细步骤:
- 开始节点的结构
- 检查节点的前置规则(目前都为空,这套规则没有用起来),最终调用 handleBeforeInterceptors
- 判断能够进入节点,最终调用executeActivity 方法;
- 发送MQ事件:ActivityInstanceStartEvent
- 调用StartActivityHandler .perform,直接返回true
其实这里可以考虑用来实现流程发起的白名单机制;
- 判断节点此时能够直接离开(完结)--通过com.alibaba.flowengine.ext.workflow.impl.ElementRuntimeManagerImpl#leave;
- 实际是校验后置拦截器 ,调用方法handleAfterInterceptors,基本activitySetting.getAfterInterceptors() 都为空(此处没有用起来)
- 如果可以离开,则执行this.countReduction,做减数;
- 获取路由调度器RouteHandler 进行路由,因为当前节点是开始节点,所以获取到的路由器是线规则路由(MultipleTransitionRouteHandler)
- z最终会执行 MultipleTransitionRouteHandler..execute(element, context);
获取开始节点的出线:Set<Transition> outgoingTransitions = activity.getOutTransitions()
- 通过检查mei每个出线的线规则,找到符合要求的出线,如果检查出线呢?通过 com.alibaba.flowengine.ext.transition.behavior.AbstractTransitionBehavior#evaluate ,elementRuntimeManager.check
- 判断能够进入出线 : this.elementRuntimeManager.enter(transition, context)
- 最终执行完路由,找到出线的出口节点transition.getDestinationActivityId(),即下一个人工节点;
- 然后又回到执行 com.alibaba.flowengine.ext.workflow.impl.ElementRuntimeManagerImpl#start,
此处其实是回到了步骤2,又开始下一轮循环
这里应该有一个误区,以为真个过程都是同步的.其实不是的.
上面是开始节点的执行,下面我们开始执行人工节点,其实人工节点的执行就是异步的.
人工节点的执行到底干了啥?
- 校验 activity.getActivitySetting().getBeforeInterceptors();
- 通过 activity.getActivitySetting().getIdentifyKey() 获取到具体的handler来执行,如果是人工节点,那么handler就是HumanActivityHandler;
- HumanActivityHandler 在执行过程中也分为异步和同步.如果是异步,则先把执行任务封装成为调度项存储到数据库;
- 调度项异步调度执行
思考
目前工作流的性能瓶颈在 发起流程实例, 因为大事务,频繁的锁流程实例导致并发能力不够高;
后续会优化的点有以下几点:
- 梳理整个流程流转/任务流转的状态机;
- 区分哪些是核心信息,哪些是非核心,如果是非核心信息,可以把一致性降低到最终一致性,提高性能;
后续的计划
- 流程实例发起白名单,可以通过人工加节点activity.getActivitySetting().getBeforeInterceptors() 来实现
- 和OA审批打通;
- 自动节点能力抽取,复用到更大的业务场景;