【阅读原文】戳:解密开源Serverless容器框架:事件驱动篇
关于事件驱动
早在2018年,Gartner评估报告将Event-Driven Model列为10大战略技术趋势之一,并强调事件驱动架构 (EDA) 是技术和软件领域发展的主要驱动力。
事件驱动是指在分布式系统中,各个组件之间的交互基于事件通信,而非直接的请求-响应模式,具有异步、松散耦合等特征。在EDA中,组件之间通过发布(Publish)和订阅(Subscribe)事件来实现协作,事件可以是用户操作、系统状态变化、传感器数据等。
云原生Serverless事件驱动框架:Knative Eventing
Knative是一款基于Kubernetes集群的开源Serverless框架,提供了云原生、跨平台的Serverless 编排标准。作为Serverless 中必不可少的事件驱动能力,Knative Eventing提供了云原生的事件驱动能力。
Knative Eventing是一个独立平台,为各种类型的工作负载提供支持,包括标准Kubernetes服务和Knative Serving服务。使用标准HTTP POST请求在事件生产者和接收器之间发送和接收事件。这些事件符合CloudEvents规范,支持使用任何编程语言创建、解析、发送和接收事件。此外Knative Eventing组件是松散耦合的,可以彼此独立开发和部署。
Knative Eventing一般使用场景:
• 发布事件:可以将事件作为HTTP POST发送到代理(broker),与生产事件的应用程序解耦。
• 消费事件:可以使用触发器(trigger)根据事件属性消费来自代理的事件。消费服务以HTTP POST形式接收事件。
Knative中的事件网格(Broker/Trigger)
事件网格是一种动态基础设施,旨在简化从发送者到接收者的事件分发。与Apache Kafka或RabbitMQ等传统消息通道架构类似,事件网格提供异步(存储转发)消息传递,允许及时解耦发送者和接收者。不过与传统的基于消息通道的集成模式不同,事件网格还通过将发送者/接收者与底层事件传输基础设施(可能是一组联合的解决方案,如Kafka、RabbitMQ或云提供商基础设施)解耦,简化了发送者和接收者的路由问题。事件网格通过任何环境中互连的事件Broker将事件从生产者传输到消费者,甚至以无缝和松散耦合的方式在云之间传输。
图片来源于Knative社区
如上图所示,Knative事件网格定义了事件入口和出口的Broker和TriggerAPI。Knative Eventing使用一种称为“鸭子类型”的模式,允许多种资源参与事件网格。鸭子类型允许多种资源类型宣传通用功能,例如“可以在URL处接收事件”或“可以将事件传递到目的地”。Knative Eventing使用这些功能来提供可互操作的源池,用于将事件发送到Broker和作为路由事件的目的地Trigger。Knative Eventing API包含三类API:
事件入口:支持连接事件发送者:Source duck type和SinkBinding,支持轻松配置应用程序以将事件传递到Broker。即使没有安装任何源,应用程序也可以提交事件并使用Eventing。
事件路由:Broker和Trigger对象支持定义网格和事件路由。请注意,Broker符合可寻址事件目标的定义,因此可以将事件从一个集群中的Broker中继到另一个集群中的Broker。同样,Trigger使用与许多源相同的Deliverable鸭子类型,因此很容易用事件网格替代直接传递事件。
事件出口:Deliverable契约支持指定裸URL或引用实现Addressable接口(具有)的Kubernetes对象status.address.url作为目的地。
- 事件源 -
Knative事件源
Knative社区提供了丰富的事件源支持,主要如下:
• APIServerSource:将Kubernetes API服务器事件引入 Knative。每次创建、更新或删除Kubernetes资源时,APIServerSource都会触发一个新事件。
• PingSource:定时发送一条Ping的事件通知。
• Apache CouchDB:将Apache CouchDB消息传入Knative。
• Apache Kafka:KafkaSource从Apache Kafka集群读取事件,并将这些事件传递到发送给事件消费端。
• RabbitMQ
• GitHub
• GitLab
• RedisSource
此外还支持Apache Camel、VMware等第三方事件源。
- 事件转发 -
Broker/Trigger事件转发流程
我们以InMemoryChannel (IMC) 事件处理流程为例,介绍 Knative中事件处理流程。
在Knative Eventing中使用Broker/Trigger模型需要选择相应的Channel, 也就是事件流转系统,当前社区支持Kafka、NATS Streaming、InMemoryChannel事件转发通道,默认是 InMemoryChannel。
其中几个关键组件说明如下:
• Ingress: Broker/Trigger模型中,事件接收入口,用于接收事件并转发到对应的Channel服务上。
• imc-dispatch: InMemoryChannel事件转发服务。用于接收Ingress过来的事件请求,并根据InMemoryChannel中描述的转发目标(subscription)Fan Out到Filter服务上。
• Filter: 用于事件过滤,实现逻辑是基于Trigger中定义的过滤规则,进行事件过滤,并最终转发到对应的目标服务上。
以如下Broker示例进行解释说明:
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: generation: 1 name: default namespace: default spec: config: apiVersion: v1 kind: ConfigMap name: config-br-default-channel namespace: knative-eventing delivery: backoffDelay: PT0.2S backoffPolicy: exponential retry: 10 status: address: name: http url: http://broker-ingress.knative-eventing.svc.cluster.local/default/default annotations: knative.dev/channelAPIVersion: messaging.knative.dev/v1 knative.dev/channelAddress: http://default-kne-trigger-kn-channel.default.svc.cluster.local knative.dev/channelKind: InMemoryChannel knative.dev/channelName: default-kne-trigger ...
这里status.address.url 实际传入的参数: http://broker-ingress.knative-eventing.svc.cluster.local/{namespace}/{broker}。也就是说每个broker创建完成之后,会在Ingress服务中对应请求的Path。
Ingress接收到事件之后转发到哪里呢?它会根据status中 knative.dev/channelAddress地址进行转发。在 IMC Channel中,会转发到default-kne-trigger-kn-channel服务。
http://default-kne-trigger-kn-channel.default.svc.cluster.local
我们接着看这个default-kne-trigger-kn-channel服务对应哪里,它实际对应了imc-dispatcher pod,也就是说通过dispatcher进行转发。
kubectl get svc default-kne-trigger-kn-channel NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE default-kne-trigger-kn-channel ExternalName <none> imc-dispatcher.knative-eventing.svc.cluster.local 80/TCP 98m
dispatcher中核心处理在Fanout Handler中,它负责将接收到的事件分发到不同的Subscription。
现在来看一下InMemoryChannel的配置, 里面定义了所转发的Subscription:
apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel metadata: labels: eventing.knative.dev/broker: default eventing.knative.dev/brokerEverything: "true" name: default-kne-trigger namespace: default ownerReferences: - apiVersion: eventing.knative.dev/v1 blockOwnerDeletion: true controller: true kind: Broker name: default uid: cb148e43-6e6c-45b0-a7b9-c5b1d81eeeb6 spec: delivery: backoffDelay: PT0.2S backoffPolicy: exponential retry: 10 subscribers: - delivery: backoffDelay: PT0.2S backoffPolicy: exponential retry: 10 generation: 1 replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/default/default subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/my-service-trigger/f8df36a0-df4c-47cb-8c9b-1405111aa7dd uid: 382fe07c-ce4d-409b-a316-9be0b585183a status: address: name: http url: http://default-kne-trigger-kn-channel.default.svc.cluster.local ... subscribers: - observedGeneration: 1 ready: "True" uid: 382fe07c-ce4d-409b-a316-9be0b585183a
这里对应的http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/my-service-trigger/f8df36a0-df4c-47cb-8c9b-1405111aa7dd服务。也就是对应转发到broker filter服务上。
然后filter服务根据trigger中定义的过滤条件(filter属性)进行事件过滤,将过滤之后的事件发送到status中的subscriberUri访问地址。这里为http://event-display.default.svc.cluster.local。
apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: labels: eventing.knative.dev/broker: default name: my-service-trigger namespace: default spec: broker: default filter: {} subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display namespace: default status: ... observedGeneration: 1 subscriberUri: http://event-display.default.svc.cluster.local
到此,基于Broker/Trigger模型使用inmemorychannel channel转发的整个流程结束。
curl -v "http://172.16.85.64/default/default" -X POST -H "Ce-Id: 536808d3-88be-4077-9d7a-a3f162705f79" -H "Ce-Specversion: 1.0" -H "Ce-Type: dev.knative.samples.helloworld" -H "Ce-Source: dev.knative.samples/helloworldsource" -H "Content-Type: application/json" -d '{"msg":"Hello World from the curl pod."}'
2024/09/23 03:25:23 receive cloudevents.Event: %!(EXTRA string=Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.samples.helloworld source: dev.knative.samples/helloworldsource id: 536808d3-88be-4077-9d7a-a3f162705f79 time: 2024-09-23T03:25:03.355819672Z datacontenttype: application/json Extensions, knativearrivaltime: 2024-09-23T03:25:23.380731115Z Data, { "msg": "Hello World from the curl pod." } )
- 事件编排 -
Knative Eventing提供了2类自定义资源(CRDs)用于定义事件编排流程:
• Sequence :顺序事件处理流程
• Parallel:并行事件处理流程
Sequence
Sequence是将多个Knative服务顺序编排在一起,并将处理之后的结果输出作为下一个服务的输入。配置示例如下:
apiVersion: flows.knative.dev/v1 kind: Sequence metadata: name: sequence spec: channelTemplate: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel steps: - ref: apiVersion: serving.knative.dev/v1 kind: Service name: first - ref: apiVersion: serving.knative.dev/v1 kind: Service name: second - ref: apiVersion: serving.knative.dev/v1 kind: Service name: third reply: ref: kind: Service apiVersion: serving.knative.dev/v1 name: event-display
使用场景包括:
• 顺序处理
如图创建一个PingSource,将事件提供给Sequence,然后获取该Sequence的输出并显示结果输出。
• Sequence连接另外一个Sequence
如图创建一个PingSource,将事件发送到Sequence服务中,然后获取该Sequence服务处理的输出并将其发送到第二个Sequence服务,最后显示结果输出。
• 直接处理
如图创建一个PingSource,将事件提供给Sequence。然后直接通过Sequence顺序执行服务。
• 通过Broker/Trigger模型
如图创建一个PingSource,将事件输入Broker,然后创建一个 Filter,将这些事件连接到由3个服务组成的Sequence中。然后,我们获取Sequence的输出,并将新创建的事件发送回Broker,并创建另一个Trigger,最后通过EventDisplay服务打印这些事件。
Parallel
Parallel是Knative Eventing中定义的并行处理工作流,配置示例如下:
apiVersion: flows.knative.dev/v1 kind: Parallel metadata: name: demo-parallel namespace: default spec: branches: - subscriber: ref: apiVersion: v1 kind: Service name: demo-ksvc1 namespace: default - subscriber: ref: apiVersion: v1 kind: Service name: demo-ksvc2 namespace: default channelTemplate: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel
Parallel事件处理流程如图:
在Trigger中配置目标为Parallel即可,然后通过Parallel可以配置多个目标服务,系统默认创建相应的Subscription用于转发到对应的Knative Service服务。
与EventBridge结合
Knative Eventing中默认的InMemoryChannel是基于内存的Channel,目前社区并不推荐生产环境使用。建议使用Kafka、EventBridge等消息或事件驱动产品。
事件总线EventBridge是阿里云提供的一款无服务器事件总线服务,支持阿里云服务、自定义应用、SaaS应用以标准化、中心化的方式接入,并能够以标准化的CloudEvents 1.0协议在这些应用之间路由事件,帮助您轻松构建松耦合、分布式的事件驱动架构。
EventBridge支持多种数据源接入,通过配置事件总线、事件规则、事件目标,经过事件过滤、转换,可以触发EventBridge事件驱动Knative服务消费事件,实现资源的按需使用。技术架构图如下所示:
当前阿里云容器服务Knative中提供了一键配置Trigger的产品化能力,如图:
通过EventBridge投递事件效果如下:
与EventBridge结合的优势如下:
• 标准与生态
兼容CloudEvents协议,全面拥抱开源社区生态;
集成阿里云更多的事件源与事件目标处理服务,覆盖大部分用户的场景。
• 高吞吐和容灾能力
底层基于高吞吐、高可靠、多副本容灾的消息内核作为存储;
提供事件回放、事件轨迹追踪等差异化特性。
• 功能完善
简单灵活的配置,支持事件过滤和事件路由;
提供跨Region、混合云、多云的事件推送能力。
• 配套工具链
schema中心化存储与多语言映射,提升事件处理协作效率;
schema发现,自动注册与校验,IDE插件集成。
• 可观测性&可治理性
提供事件的可观测性能力,支持事件查询、审计以及全链路的追踪;
提供事件的可治理能力,支持事件流控、事件回放、事件重试策略等。
与KEDA的差异
说起K8s中事件驱动,KEDA也是一个广为熟知的事件驱动服务。 那我对于KEDA与Knative Eventing二者有什么不同,经常有些疑问。这里从我的角度来解释一下。其实从这二者的定位,就更容易进行区分。
先看KEDA官方定义:
KEDA is a Kubernetes-based Event Driven Autoscaler. With KEDA, you can drive the scaling of any container in Kubernetes based on the number of events needing to be processed.
KEDA是一个基于Kubernetes的事件驱动自动缩放器。借助 KEDA,您可以根据需要处理的事件数量来驱动Kubernetes中任何容器的扩缩容。我们可以简单理解为KEDA是支持多种指标能力增强HPA,它基于事件数量的指标,扩容Pod,然后Pod自身负责从对应的服务中获取事件进行消费。在整个事件驱动流程中KEDA并不会接管事件的流转,它仅根据事件指标扩缩容消费服务的Pod数量,可以说是一个非常简单、易上手的事件驱动弹性服务。
再看Knative Eventing:
Knative Eventing is a collection of APIs that enable you to use an event-driven architecture with your applications.
就像本文所介绍的,Knative Eventing则接管了事件的编排、流转、规则过滤、分发,提供了一个比较完整的事件驱动框架。
因此对于选择Knative Eventing和KEDA,最好结合自身的使用场景进行选择。
一个有趣的场景
最后在Knative中基于事件驱动分享一个简单的demo场景。
我们知道,口渴了,那么要喝水。人体执行这个简单的事情,处理流程可以简单抽象为下图:
那么在Knative中模拟这个场景,可以进行如下操作:
• 发送一个口渴的事件
• 千问大模型接收到口渴输入,给出决策
• 执行模拟喝水的服务
根据相关的健康饮水指引,上班族应该形成良好的喝水习惯,每1小时应该喝一次水。我们这里假设通过PingSource模拟每1个小时发送一个口渴的信号。
apiVersion: sources.knative.dev/v1 kind: PingSource metadata: name: ping-source spec: schedule: "* */1 * * *" contentType: "application/json" data: '{"model": "qwen", "messages": [{"role": "user", "content": "渴了"}], "max_tokens": 10, "temperature": 0.7, "top_p": 0.9, "seed": 10}' sink: ref: apiVersion: flows.knative.dev/v1 kind: Sequence name: sequence
通过Sequence进行编排,将接收到的信号发送给qwen大模型服务(参考:基于Knative部署vLLM推理应用[1])。
apiVersion: flows.knative.dev/v1 kind: Sequence metadata: name: sequence spec: channelTemplate: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel steps: - ref: apiVersion: serving.knative.dev/v1 kind: Service name: qwen reply: ref: kind: Service apiVersion: serving.knative.dev/v1 name: drink-svc
qwen服务给出决策结果如下:
{"id":"cmpl-6251aab6a0dc4932beb82714373db2ac","object":"chat.completion","created":1733899095,"model":"qwen","choices":[{"index":0,"message":{"role":"assistant","content":"如果你感到口渴,可以尝试喝一些水"},"logprobs":null,"finish_reason":"length","stop_reason":null}],"usage":{"prompt_tokens":10,"total_tokens":20,"completion_tokens":10}}
然后调用drink-svc喝水服务
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: drink-svc namespace: default spec: template: spec: containers: - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/event-display:v1211-action env: - name: ACTION value: "drink water"
这里直接打印输出结果:
# 日志输出结果如下所示: ACTION: drink water
最后我们再来看一张Nvidia对Agentic AI[2]场景定义的图片:
Agentic AI是具有更高程度自主性的AI系统,它们能够主动思考、规划和执行任务,而不仅仅依赖于预设的指令。因此在 Agentic AI应用场景下,或许Knative可以提供一些帮助。
小结
Knative Eventing提供了一款开源的云原生Serverless事件驱动框架,相信在Serverless与AI进行结合的场景中有更多的应用空间。欢迎有兴趣的加入阿里云Knative钉钉交流群(群号: 23302777)。
参考:
[1]基于Knative部署vLLM推理应用:
[2]What Is Agentic AI?
https://blogs.nvidia.com/blog/what-is-agentic-ai/
我们是阿里巴巴云计算和大数据技术幕后的核心技术输出者。
获取关于我们的更多信息~