导读
Knative 作为主流的开源 Serverless Framework,除了提供应用部署、自动弹性之外,还提供事件驱动能力。在面对 AI 场景时,用户对事件对精细化处理以及节省资源成本都有较高对要求,这里介绍通过Knative实现事件驱动、事件分发与资源自动弹性,很好的满足用户场景的诉求。本文会从以下 3 个方面展开介绍:
- 事件驱动框架:Knative Eventing
- 事件驱动引擎:事件源介绍
- AI 事件驱动场景实践
Knative 模型
我们先来整体看一下 Knative 中两大核心模块,Serving 中是通过 Knative Service 提供应用模型,支持多版本管理、自动弹性以及流量灰度发布。Eventing 提供了事件驱动框架,包括事件源、Broker/Trigger模型。接下来我们主要介绍 Knative Eventing。
事件驱动框架:Eventing
Knative 的 Eventing 提供了完整的事件模型,可以很容易地接入各个外部系统的事件。事件接入以后通过 CloudEvent 标准在内部流转,结合 Broker/Trigger 机制给事件处理提供了非常理想的方式。
- Knative Eventing 提供的服务是松散耦合,可进行独立开发和部署。服务可跨平台使用(如 Kubernetes, VMs, SaaS 或者 FaaS)
- 事件的生产者和事件的消费者是相互独立的。
- 社区提供多种开箱即用的事件源
- 支持第三方消息系统,提供灵活的扩展性。
关键特性-事件规则
在 Eventing 中通过Broker/Trigger进行事件流转和分发,分发离不开规则的设置,Trigger 中提供了丰富的规则设置能力,如图所示:
- Trigger的filter的作用是对Event进行内容过滤。
- 支持对Event的Attribute以及Data的内容进行过滤。
- 支持Common Expression Language (CEL) 表达式过滤 。
- 支持通过SourceAndType进行过滤
事件驱动引擎: 事件源
事件源是事件驱动的引擎,Knative社区提供了丰富的事件源,如Kafka、GitHub等。此外还接入消息云产品事件源,如MNS、RocketMQ等。
这套完备的事件系统可以比较容易的实现事件驱动场景如下:
- 自动 CICD
- AI 音视频场景
- 比如和各种云产品的事件对接,从而实现云产品状态更新自动触发一个服务等
RocketMQ事件源
消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
RocketMQSource 是 Knative 平台的 RocketMQ 事件源。其可以将 RocketMQ 集群的消息以Cloud Event的格式实时转发到Knative 平台,是 Apahe RocketMQ 和 Knative 之间的连接器。
- 对接RocketMQ消息系统
- 消息分发到 Knative Service服务
Kafka事件源
消息队列 Kafka 版是阿里云基于 Apache Kafka 构建的高吞吐量、高可扩展性的分布式消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等,是大数据生态中不可或缺的产品之一,阿里云提供全托管服务,用户无需部署运维,更专业、更可靠、更安全。
Knative 社区提供Kafka事件源,支持开源社区Kafka以及阿里云Kafka产品:
- 对接Kafka消息系统
- 消息分发到 Knative Service服务
AI 事件驱动场景实践
接下来我们以Kafka事件源为例,介绍一个典型的消息驱动场景。
客户是一个线上直播系统,每天都有海量的直播访问,访问量根据直播热度弹性波动,存在不定时激增;同时,直播系统支持用户在线互动,互动行为会通过消息服务(Kafka)实时推送到服务端进行数据和AI处理,处理结果会即时推送回视频流与视频直播产生联动。消息数据的处理主要有以下技术挑战:
1、业务弹性波动,消息并发高。
2、互动实时响应,低延迟。
为了满足客户系统对于消息处理的弹性波动、高并发、低时延的要求,客户选择使用阿里云的Knative服务进行数据的弹性处理。阿里云Knative完全契合用户的需求,在兼容k8s标准的基础上,提供了基于事件/消息的容器弹性调度。应用实例数随着业务波峰波谷实时扩容和缩容,真正做到了按需使用,实时弹性的云计算能力。整个过程完全自动化,极大的减少了业务开发人员在基础设施上的心智负担。接下来我们通过一个示例来模拟用户的场景
前提条件
- 创建ACK/ASK集群,并选择【Knative】进行安装
- 对于已有ASK集群,部署Knative参考
- 部署事件驱动Eventing
操作步骤
- 部署事件网关
在Knative 组件管理控制台,选择“Kourier”以及“EventGateway” addon组件进行部署
- 部署事件源。选择 Kafka 事件源进行部署。
- 部署服务
这里我们部署一个Knative Service 服务 event-display,用于接收事件。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display annotations: networking.knative.dev/flow-controller.class: strict networking.knative.dev/ingress.class: kourier.ingress.networking.knative.dev spec: template: metadata: annotations: autoscaling.knative.dev/class: xpa.autoscaling.knative.dev autoscaling.knative.dev/maxScale: "100" autoscaling.knative.dev/minScale: "1" autoscaling.knative.dev/pod-buffer-size: "3" autoscaling.knative.dev/gc-time: "30" spec: containers: - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/event-display:0128
部署完成之后,我们可以查看结果:
$ kubectl get po |grep event event-display-tslr5-deployment-5c9c4c469b-bnjlz 2/2 Running 0 3h53m event-display-tslr5-deployment-5c9c4c469b-bwxtm 2/2 Running 0 3h53m event-display-tslr5-deployment-5c9c4c469b-fjjr7 2/2 Running 0 5d13h
- 创建事件源CR
创建KafkaSource资源,设置Kafka 对应的topic、consumerGroup、 kourier 网关地址以及用于消费事件的服务event-display。
apiVersion: sources.knative.dev/v1alpha1 kind: KafkaSource metadata: name: kafka-source annotations: k8s.aliyun.com/domain: event-display.default.example.com k8s.aliyun.com/retry-count: "5" k8s.aliyun.com/retry-interval: "2" k8s.aliyun.com/req-timeout: "60" spec: consumerGroup: demo-consumer # Broker URL. Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrapServers: 192.168.7.113:9092,192.168.7.110:9092,192.168.7.108:9092 topics: demo sink: uri: http://101.200.177.84
创建完成之后,可以查看对应的kafka事件源pod:
$ kubectl get po|grep kafka kafkasource-kafka-source-a8b95415-703f-4d50-8339-6203f0f6154pzq 1/1 Running 0 5d13h kafkasource-kafka-source-a8b95415-703f-4d50-8339-6203f0f61x9xc9 1/1 Running 0 5d13h
执行结果
我们通过客户端往kafka消息队列中发送消息进行结果验证,执行如下:
$ kubectl get po NAME READY STATUS RESTARTS AGE event-display-tslr5-deployment-5c9c4c469b-58hv6 2/2 Running 0 23s event-display-tslr5-deployment-5c9c4c469b-bnjlz 2/2 Running 0 4h6m event-display-tslr5-deployment-5c9c4c469b-bwxtm 2/2 Running 0 4h6m event-display-tslr5-deployment-5c9c4c469b-d9vxc 2/2 Running 0 23s event-display-tslr5-deployment-5c9c4c469b-fjjr7 2/2 Running 0 5d13h event-display-tslr5-deployment-5c9c4c469b-kl827 2/2 Running 0 24s event-display-tslr5-deployment-5c9c4c469b-l5642 2/2 Running 0 24s event-display-tslr5-deployment-5c9c4c469b-q82pq 2/2 Running 0 23s event-display-tslr5-deployment-5c9c4c469b-qh7z5 2/2 Running 0 24s event-display-tslr5-deployment-5c9c4c469b-qmcnn 2/2 Running 0 24s event-display-tslr5-deployment-5c9c4c469b-t7sv2 2/2 Running 0 24s event-display-tslr5-deployment-5c9c4c469b-tpnwk 2/2 Running 0 24s
总结
本文介绍了 Knative 驱动框架:Eventing,并支持 Kafka 以及 RocketMQ 事件源接入,对于 AI 事件驱动场景下我们如何将消息服务和 Knative 进行结合进行了介绍。欢迎有兴趣的同学进行交流。