一、工作流与微服务编排
1. 工作流
提到工作流,印象里都是OA系统各种请假审批流。事实上,广义上的工作流是对工作流程及其各操作步骤之间业务规则的抽象、概括、描述。简单理解,我们为了实现某个业务目标,抽象拆解出来的一系列步骤及这些步骤之间的协作关系,就是工作流。例如订单发货流、程序构建流等。业界通常用BPMN流程图来描述一个工作流。
(1)没有工作流时的任务协作
以实现一个用户购买逻辑为例,如果不应用工作流模型,我们串联多个任务(步骤)一般是通过显示的代码调用:
校验、支付、发货一气呵成,流畅自然。正喝着枸杞红枣,产品一脸笑意跑过来:“我们新搞个充会员卡的业务,大概步骤就校验 -> 推荐 -> 支付 -> 充值。校验和支付前面都做过了,应该很快实现吧?”
精通if-else的你,听完的瞬间就已经构思好了代码:
一通写下来,总感觉哪里不对,“为什么加新的任务节点,要改已有的代码呢?这不符合开闭原则啊!”。
(2)应用工作流模型的任务协作
工作流模型正是为了解决这类问题而生:分离任务的实现和任务的协作关系。上面同样的用户购物逻辑,有了工作流模型,各个任务只实现自己原子的逻辑,任务协作关系使用流程图来表达。
当新的逻辑需要复用已有任务节点时,只需要调整流程图,无需修改已有代码。
2. 工作流引擎
将任务实现与任务协作关系分离之后,就诞生了专门维护任务协作关系的程序 - 工作流引擎(也常称作流程引擎)。
其中最具有代表性的就非Activiti莫属了。在企业应用蓬勃发展的21世纪初,它几乎是实现流程自动化的标配。关于Activiti的介绍,网上已经有足够多的文章。今天我们要介绍是由Activiti的核心成员打造的另一款专为微服务编排而生的工作流引擎 - Zeebe。在开始之前,我们先理解下什么是微服务编排。
3. 微服务编排
微服务架构的一大核心是把大的复杂的业务系统拆分成高内聚的微服务,每个服务负责相对独立的逻辑。例如一个电商系统,可能会拆分出支付微服务、订单微服务、仓储微服务、物流微服务等。服务拆分的好处无需赘述,但是要实现业务价值,不是看单个服务的能力,而是要协调所有服务保证企业端到端业务流的成功。
那么,哪个服务来负责端到端业务流的成功呢?答案是没有。事实上,在公司内,端到端的业务流可能都没有正式的文档说明,从一个微服务到另一个微服务的事件流转都是在代码里隐式表达的。 很多微服务架构依赖一种相对纯粹的编舞模式(choreography pattern)来解决这个问题。在这种模式下,微服务通过向一个消息队列发送和接收事件来相互协作。编舞模式给开发者提供了很高的灵活度,但是编舞模式仍不能解决:
可见性:多少端到端业务流正在运行中,它们的状态是什么样子。过去24小时,有多少业务流实例没有成功结束?为什么这些业务流实例没有成功结束?一个业务流或者某个任务完成的平均时间是多少?
异常处理:如果业务流里有一个微服务失败,谁负责处理这个异常?业务流的重试逻辑是怎么样的?如果需要人工介入,问题的升级处理规则是怎么样的?
于是便诞生了一种更严格的编排模式(orchestration pattern),用于协调各个微服务。在这种模式下,会有一个中控的引擎:
按照业务逻辑的蓝图,编排各个微服务的调用关系;
监控整个业务流的状态;
提供自动化的机制处理单个服务的失败,保证整个业务流的成功。
可以借用下面的图,来进一步理解微服务编排和微服务编舞模式的区别:
按照我们前面对工作流模型的阐述,工作流引擎很适合作为中控引擎,来编排调度微服务。那为什么诸如Activiti等传统的工作流引擎没能继续占领微服务编排的市场,而是诞生了新的微服务编排引擎-Zeebe?更有趣的是,Zeebe的核心开发,也是来自最初的Activiti团队。 答案是诸如Activiti等传统工作流引擎的架构无法适应当下微服务的场景:
传统的工作流引擎,编排的大部分是人工审批任务,意味着任务流转效率低,系统吞吐低。而当下微服务大部分是程序化的自动任务,意味着任务高效流转,系统吞吐高。单点架构、同步响应、高度依赖DB的Activiti,显然支撑不了这样的场景。
Activiti等工作流引擎,通常都以jar包的形式,嵌入到业务程序中,直接通过调用本地方法的方式调度起业务TaskHandler。在单体架构下,这种集成方式简单易用。但是在微服务架构下,工作流的任务往往是分布在多个服务的,而且同一个服务往往还会根据负载情况部署不同数量的实例。如果还是采用引擎主动调用的方式,怎么寻址到具体的TaskHandler?当后端业务服务处理能力本身是瓶颈的时候,如果引擎还是不断的调用,只会进一步压垮服务。
而Zeebe在设计之初,就考虑到了这些问题,下文来为大家详细介绍。
二、Zeebe特性与顶层架构
1. Zeebe核心特性
Zeebe是专为微服务编排设计的免费开源的工作流引擎,它提供了:
可见性(visibility):Zeebe提供能力展示出企业工作流运行状态,包括当前运行中的工作流数量、平均耗时、工作流当前的故障和错误等;
工作流编排(workflow orchestration):基于工作流的当前状态,Zeebe以事件的形式发布指令(command),这些指令可以被一个或多个微服务消费,确保工作流任务可以按预先的定义流转;
监控超时(monitoring for timeouts)或其他流程错误:同时提供能力配置错误处理方式,比如有状态的重试或者升级给运维团队手动处理,确保工作流总是能按计划完成。
Zeebe设计之初,就考虑了超大规模的微服务编排问题。为了应对超大规模,Zeebe支持:
横向扩容(horizontal scalability):Zeebe支持横向扩容并且不依赖外部的数据库,相反的,Zeebe直接把数据写到所部署节点的文件系统里,然后在集群内分布式的计算处理,实现高吞吐;
容错(fault tolerance):通过简单配置化的副本机制,确保Zeebe能从软硬件故障中快速恢复,并且不会有数据丢失;
消息驱动架构(message-driven architecture):所有工作流相关事件被写到只追加写的日志(append-only log)里;
发布-订阅交互模式(publish-subscribe interaction model):可以保证连接到Zeebe的微服务根据实际的处理能力,自主的消费事件执行任务,同时提供平滑流量和背压的机制;
BPMN2.0标准(Visual workflows modeled in ISO-standard BPMN 2.0):保证开发和业务能够使用相同的语言协作设计工作流;
语言无关的客户端模型(language-agnostic client model):可以使用任何编程语言构建Zeebe客户端。
2. Zeebe架构
Zeebe架构主要包含4大组件:client, gateway, brokers 以及 exporters.
(1)Client
客户端向Zeebe发送指令:
发布工作流(deploy workflows)
执行业务逻辑(carry out business logic)
-创建工作流实例(start workflow instances) -发布消息(publish messages) -激活任务(activate jobs) -完成任务(complete jobs) -失败任务(fail jobs)
- 处理运维问题(handle operational issues)
-更新实例流程变量(update workflow instance variables) -解决异常(resolve incidents)
客户端程序可以完全独立于Zeebe扩缩容,Zeebe brokers不执行任何业务逻辑。客户端是嵌入到应用程序(执行业务逻辑的微服务)的库,用于跟Zeebe集群连接通信。客户端通过基于HTTP/2协议的gRPC与Zeebe gateway连接。
Zeebe官方提供了Java和Go客户端。社区提供了C#,Ruby,JavaScript客户端实现。gRPC协议很方便生成其他语言的客户端。
Client中,执行单独任务的单元叫JobWorker。
(2)Gateway
Gateway作为Zeebe集群的入口,转发请求到brokers。Gateway是无状态(stateless)无会话(sessionless)的,可以按需增加节点,以负载均衡及高可用。
(3)Broker
Broker是分布式的流程引擎,维护运行中流程实例的状态。Brokers可以分区以实现横向扩容、副本以实现容错。通常情况下,Zeebe集群都不止一个节点。
需要重点强调的是,broker不包含任何业务逻辑,它只负责:
- 处理客户端发送的指令
- 存储和管理运行中流程实例的状态
分配任务给job workers
Brokes形成一个对等网络(peer-to-peer),这样集群不会有单点故障。集群中所有节点都承担相同的职责,所以一个节点不可用后,节点的任务会被透明的重新分配到网络中其他节点。
(4)Exporter
Exporter系统提供Zeebe内状态变化的事件流。这些事件流数据有很多潜在用处,包括但不限于:
- 监控当前运行流程实例的状态
- 分析历史的工作流数据以做审计或BI
跟踪Zeebe抛出的异常(incident)
Exporter提供了简洁的API,可以流式导出数据到任何存储系统。Zeebe官方提供开箱即用的Elasticsearch exporter,社区也提供了其他Exporters。
三、Zeebe内部核心实现
Zeebe能做到高吞吐、高可用的微服务编排,得益于三个关键实现:
1. 分布式 Zeebe设计之初就考虑了分布式部署,可以在不依赖外部组件的情况下,搭建一个Zeebe broker集群,集群中节点组成一个对等的网络(peer-to-peer network)。在网络中,所有的节点都有相同的职责,保证集群不会有单点故障。
Zeebe内部抽象了一个只追加写的队列(可以类比理解成kafka的topic),来处理和存储数据。当集群有多个broker节点时,会将队列划分成多个分区(partitions,或者分片shards),分布到各个节点上。每个分区有多个副本(replicas)。在所有的副本中,会根据raft协议选出一个leader,leader负责接收请求和执行所有处理逻辑。其他broker上的副本就是被动的跟随者(passive followers)。当leader不可用时,followers会透明地选出新的leader。
2. 消息驱动 Zeebe消息驱动架构,体现在两个方面:
- Zeebe Broker内部使用队列(即LogStream,只追加写),异步处理请求;
Zeebe JobWorker和Broker使用发布订阅的模式交互,当工作流任务状态发生变化,Broker会发布相应事件。JobWorker通过轮询的方式,订阅处理自己相关的事件。
(1)Broker内部流处理模型 Zeebe内部实现,其实就是一系列作用在记录流(record streams)上的流处理器(stream processors)。流处理模型作为一个统一的实现方式,提供:
- 指令协议(command protocol,即请求响应)
- 记录导出(record export / streaming)
工作流演算(evaluation, 异步后台任务)
a. 状态机(state machines) Zeebe管理有状态的实体:任务、工作流实例等。在内部,这些实体实现为流处理器管理的状态机。状态机模式的概念很简单。一个状态机的实例总是处于某个逻辑状态。对于每一个状态,一系列转换(transitions)定义了下一步可能的状态。转换到下一个新状态可能产生输出或者副作用(side effects)。
如下图是任务(jobs)的状态机:
椭圆代表状态,箭头代表状态转换。要注意的是,状态转换只能用在特定的状态上。比如,不能在任务处于CREATED状态的时候完成(complete)任务。
b. 事件和指令(events and commands) 状态机里的每个状态变化被称为事件(event)。Zeebe会把每个事件当成一条记录发布到流上。状态变化可以通过发送指令触发。Zeebe broker从两个源头接收指令:
- 客户端发送指令。例如:发布工作流、启动流程实例、创建和完成任务等;
broker自身产生指令。例如:查找可以被worker执行的任务。
指令一旦接收到,就会被当做记录写到流里。
c. 有状态的流处理(stateful stream processing)
流处理器从流里有序地读取记录,然后根据记录关联的实体的生命周期,解析指令。流处理器循环的执行下面的步骤:
- 从流里消费指令(command)
- 根据状态生命周期和实体当前状态,判断指令是否适用
- 如果指令适用,应用到状态机。如果指令是客户端发送的,发送回响应。
- 如果指令不适用,拒绝。如果是客户端发过来的,发送错误响应信息。
发布新的事件,报告实体新的状态。
例如:处理Create Job指令,会产生Job Created事件。
d. 指令触发器(command triggers)
一个实体的状态变化可以自动触发针对另一个实体的指令。例如:当一个任务完成了,相应的流程实例应该继续后续的任务,也就是说,Job Completed事件触发了Complete Activity指令。
e. 处理背压(handling back-pressure) 当broker收到客户端请求,会先把请求写到事件流里,然后交由流处理器处理。如果处理太慢或者流里面堆积了太多客户端请求,处理器可能需要花很长时间才能处理新接收到的请求指令。如果broker继续从客户端接收新请求,待处理的任务(back log)会不断增加,任务处理延时会超过可以接收的时间。为了避免这种问题,Zeebe采用了一种背压机制。当broker接收到的请求超过其能在一定延时范围内处理的限度,broker就会拒绝(reject)一些请求。 broker能处理的最大请求速率取决于机器的处理能力、网络延时、系统的当前负载等。因此,Zeebe不会配置固定的最大请求限度。相反的,Zeebe采用自适应的算法动态的决定inflight(broker已经接收但是还没处理完成的)请求数限度。当新的请求被接收时,Inflight请求数增加;当请求处理完成,响应发回给客户端后, Inflight请求数减少。当Inflight请求数达到限度,broker拒绝后续的请求。 如果broker因为背压拒绝了客户端请求,客户端可以用合适的重试策略重试。如果拒绝率一直很高,说明broker持续处于高负载。在这种情况下,推荐降低请求速率。
(2)Broker和JobWorker发布订阅模式 业务微服务集成Zeebe client SDK,针对每个JobType,创建一个JobWorker,在JobWorker里实现业务逻辑。JobWorker的创建很简单,以Java SDK为例,在普通的Java方法上添加一个注解即可。
@Component
@Slf4j
public class SomeJob {
@EnhancedJobWorker(type = "some-service.SomeJob")
public void handleTask(final EnhancedJobClient client, final ActivatedJob job) {
// 业务逻辑
// ....
// 根据业务逻辑执行情况,结单
if (success) {
client.completeJob(job);
} else {
throw SomeException("失败原因");
}}
与Activiti引擎主动调用业务handler的模式不同,JobWorker创建之后,内部会启动一个轮询线程JobPoller,定期(默认100ms)从Broker轮询自己相关的事件(job created)。
拿到Broker端返回的任务后,会把任务信息传入业务逻辑Handler里执行。整个时序如下图:
Zeebe客户端主动轮询的模型,进一步解耦了引擎任务状态维护和微服务业务处理逻辑,可以让业务JobWorker根据自己的处理能力,以相对恒定的速率处理任务。当短时间内有大量任务创建时,Broker的队列模型,可以堆积任务,平滑流量。
3. 运行时数据和历史数据分离 当Zeebe处理任务、工作流或者内部维护时,会产生有序的记录流:
虽然客户端没办法直接审查流,但是Zeebe可以用exporter的方式,加载和配置用户代码处理每条记录。exporter提供了统一的入口处理写到流里的记录:
- 通过把历史数据推到外部数据仓库中,持久化历史数据
把记录导出到可视化工具中(例如: zeebe-simple-monitor)
Zeebe只会装载通过Zeebe YAML配置文件配置的exporters。exporter配置好后,会在Zeebe下次启动的时候,开始接收记录。需要注意的是,也只能保证接收到从那以后的数据。exporter最大的作用是可以减少Zeebe集群无限存储数据的压力。当Zeebe不再需要某些数据时,会先查询exporters看是否可以安全删除这些数据,如果可以,就会永久的删除这些数据,因此可以减少集群磁盘占用。不管exporter怎么装载(是否通过外部jar包),所有的exporters都同样的以Exporter interface定义的方式与broker交互。
(1)装载(Loading) exporter配置好后,会在broker的启动阶段装载。在装载阶段,会去校验每个exporter的配置,如果有下面的问题,broker会启动失败:
- exporter id不唯一
- exporter指向不存在或者不能访问(non-accessible)的JAR包
- exporter指向不存在或者不能实例化(non-instantiable)的类
exporter实例在Exporter#configure方法抛异常
(2)处理(Processing) 在任何时候,对于某个partition,只会有一个主节点(leader node)。当节点变成某个partition的leader时,它要做的事情之一就是运行一个exporter stream processor实例。这个流处理器,会给每个配置好的exporter创建一个实例,然后把每条记录都转发到这些exporter实例上。
这意味着,对于每个分区的exporter,只会有且只有一个实例:如果有4个分区,并且有至少4个线程处理记录,那么可能有4个exporter实例同时在导出记录。
注意Zeebe只保证至少一次(at-least-once)的语义,也就是说,一条记录至少会被一个exporter看到一次,可能会多次。在这些情况下,可能会重复:
- 在raft故障转移再处理(reprocessing)过程中(例如:选举新的leader)
偏移位没有更新出错
为了减少exporter处理的重复记录数,流处理器会记录每个exporter最新成功导出的记录的偏移位置(position)。有这个偏移位就足够了,因为流是有序的记录序列,在流中,偏移位(position)是单调递增的。这个偏移位是当exporter保证记录被成功导出后,exporter去更新维护的。
注意:虽然Zeebe尽力保证减少exporter处理的重复记录数,但是还是会出现重复记录,因此,有必要保证export操作的幂等性。可以在exporter代码中实现幂等性,但是如果导出到外部系统中,推荐在外部系统中做去重,这样降低Zeebe的负载。
(3)错误处理(Error handling) 如果错误出现在Exporter#open(Context)阶段,流处理器会失败然后重启修复错误。最坏的情况是,在错误修复前,没有exporter运行。
如果错误出现在Exporter#close阶段,错误信息会被记录到日志里,仍会保证其他exporter优雅完成工作。
如果错误出现在处理阶段,会无限重试同一条记录,直到不再产生错误。最坏的情况是,一个失败的exporter会导致其他exporters挂起。当前,需要exporter实现自己的重试/错误处理策略,后续可能改变。
(4)性能影响(Performance impact)
对于每个装载的exporter,Zeebe自然会带来一些性能影响。对于某个partition,一个慢的exporter会导致其他exporter也变慢,最坏的情况下,会完全阻塞住一个线程。因此推荐exporter的逻辑尽可能的简单,把数据增强和转换等逻辑都放在外部系统中。
四、上手体验
Zeebe作为Camunda公司的一个子项目,从2017年开始,独立开源运作。Camunda公司核心团队来自于早期的Activiti团队,主营以Activiti为核心的工作流咨询服务。从今年3月开始,Camunda宣布打包Zeebe引擎、控制台Operate、建模工具等提供SaaS服务,但核心的流程引擎Zeebe仍然以开源的方式迭代。 Zeebe不依赖外部组件,可以使用本地jar包、Docker等快速搭建集群环境【1】。参考官方指引【2】可以快速创建Demo项目。 官方给出的benchmark【3】,8核32G单节点4分区1副本的配置下,能达到3.2w instance/s的性能。而且增加Broker节点,基本可以线性提高处理能力。
在我们实际6个8C 16G的SKTE POD集群中,使用WeTest压测大师压测到了 3w instance/s的性能(每个流程2个节点,包含大约50个流程变量)。与官方数据有一定差距,受限于官方benchmark环境和配置给的不全,没办法完全模拟。看资源瓶颈主要在磁盘IO上。但这个数据相对我们之前Activiti 5~6 k instance/s就崩溃来说,已经是很大提升了。 另一方面,Zeebe还处于早期的迭代阶段,稳定性和功能完整度都还在持续优化中。目前支持到BPMN的ServiceTask任务,同时在任务调度上,也有小概率的miss掉任务的情况。但从一开始的架构设计、场景定位、社区活跃度等方面来看,是可以期待的项目。
立个flag,接下来笔者还会持续输出Zeebe内部实现的深入分析文章。分布式、异步化、Actor线程模型这些套路的实现还是很值得学习的。也欢迎对微服务编排、流程引擎感兴趣的同学拍砖交流~