Eventbridge学习

简介: Eventbridge学习

Eventbridge是一个云原生事件驱动引擎,基于DDD领域驱动模型实现,其中有相关概念和运行的流程是需要我们了解的。以下内容基于rocketmq官网和自己的学习。

一、相关概念和使用场景

1.eventsource

事件源,用于管理发送到eventbridge的事件,对应CNCF中cloudEvent事件体的source字段。

代表事件发送的源头,通常与微服务系统相对应,也即属于系统层级的一个概念,类似于监控中应用服务级别的概念。

2.eventbus

事件总线,用于存储发送到eventbridge的事件。可以是Local、RocketMQ、Kafka等。

事件生产者发送事件时,需指定事件总线。

3.eventrule

事件规则,当消费者需要订阅事件时,可以通过规则配置过滤和转换信息,将事件推送到指定的目标端。

事件规则包含三部分:

事件过滤、事件转换、事件目标

4.filterPattern

过滤规则,用于在规则中配置过滤出目标端需要的事件,可以对事件总线上的事件进行过滤。

目前支持:指定配置、前缀配置、后缀匹配、除外配置、数值匹配、数组匹配以及复杂的组合逻辑匹配。

5.transform

事件转换,将事件格式转换成目标端需要的数据格式

目前eventbridage支持的事件转换能力:

完整事件:不做转换,直接投递原生cloudEvent

部分事件:通过jsonpath语法从cloudEvent中提取出需要投递到事件目标的内容

常量:事件只起到触发器的作用,投递内容为常量

模板转换器:通过定义目标,灵活地渲染投递处理地事件格式

6.eventTarget

事件目标端,真正地事件消费者,消费者只需要按照自己地业务领域模型设计,提供一个公共地API,eventbridge就会按照Api定义需要地数据格式,将事件安全可靠地推送给target消费者。

二、事件特性

已发生、无期望、天然有序且唯一、具象化

三、RocketMQ Eventbridge是如何工作的

下图来源于Apache RocketMQ的官网:

img

1.确认事件标准:基于CNCF的cloudEvent标准

2.建立事件中心:不同应用属于不同的事件,建立一个事件中心管理不同的事件

3.定义事件格式:明确对应的格式

4.订阅规则:订阅的时候,会存在对应的规则,而这些规则是执行过滤、转换的基础

5.事件总线:建立source和target之间的联系

也即:

source->事件中心->事件总线->filter(event)->transform->target

四、源码学习

根据前面我们提到的概念,在代码Adapter中,我们可以看到对应的控制层的相关的信息:


而这些controller中涉及的操作都是基于Spring WebFlux响应式编程实现。也即Flux和Mono实现返回数据格式对象。

1.Flux和Mono的联系和区别

其中:

Flux 是一个发出(emit)0-N个元素组成的异步序列的Publisher<T>,可以被onComplete信号或者onError信号所终止。在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和onError

Mono 是一个发出(emit)0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。

两者的区别在于返回的数据:

一个是集合,一个是对象或者空,同时Flux和Mono之间可以转化。

2.putEvents操作和发送消息

其中EventDataController中会执行放入事件,发送消息的过程,比如putEvents和putHttpEvents这两个方法:

这里涉及到具体的转换器,可以看的作者的思路是基于EventConverterAdapter进行适配,这个过程中借助了map初始化,然后进行匹配请求头中的类型,然后拿到具体的转换器进行转换。

发送消息:

上面的这些controller都涉及到和数据库交互。

3.refresh刷新的场景

这个过程其实和消费是栖息相关的,因为消费者的增加和减少都会影响。类似重平衡中,消费者发生变化的时候,就会发生重平衡,因此这个操作也非常重要。

4.在runtime中会涉及到的相关操作

1)初始化信息


这个是后续操作的基础。

三种都继承了ServiceThread,可以看到最终实现了Runnable接口。


2)同时可以看到EventTargetTrigger的run方法里面:


执行记录放入到sinkTask中,同时提交偏移量,调用rocketmq。

3)EventRuleTransfer相关信息的处理


此时会调用转换引擎,通过openMessaging的Api接口,从而进入到转换引擎TransformEngine,进行EventBridgeTransform、EventBridgeFilterTransform、CloudEventTransform相关操作,也即相关的转换过滤这些操作都是在这里进行的。此时基于Transform中的process方法进行操作:


4)EventBusListener监听事件拉取消息记录放入上下文


5.消息拉取消费

storage模块中,我们可以看到我们想看的类:


可以看的eventbridge基于LitePullConsumerImpl进行poll操作,poll的过程基于本地消息缓存实现。


eventbirdge的学习,我们就到这里。整体上看,作者的思路是非常清晰的。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Serverless Kafka
基于 EventBridge 轻松搭建消息集成应用
基于 EventBridge 轻松搭建消息集成应用
22179 9
|
JSON 数据格式
EventBridge事件领域
EventBridge事件领域
64 0
|
消息中间件 机器人 Serverless
|
消息中间件 机器学习/深度学习 Kubernetes
EventBridge 事件总线及 EDA 架构解析
EventBridge 是事件驱动的具体落地产品,也是 EDA 的最佳实践方式。
414 0
EventBridge 事件总线及 EDA  架构解析
|
弹性计算 Kubernetes 数据可视化
EventBridge 特性介绍|以 IaC 的方式使用 EventBridge
本文将重点介绍 EventBridge 和 IaC 的重点概念和特性,然后演示如何应用 IaC 理念自动化部署 EventBridge 来使用这些概念和特性。
261 0
EventBridge 特性介绍|以  IaC 的方式使用 EventBridge
|
API 调度
阿里云事件总线 EventBridge 最佳实践
本文介绍如何把阿里云事件总线 EventBridge 的内容接入观测云平台,通过观测云强大的统一汇聚能力轻松获取阿里云事件,实时追踪最新的数据信息。
246 0
|
存储 运维 Kubernetes
事件总线EventBridge(EB)的测评
对阿里云 事件总线EventBridge(EB)部分场景的测评
258 0
事件总线EventBridge(EB)的测评
|
消息中间件 存储 Cloud Native
阿里云 EventBridge 事件驱动架构实践
我们认为 EventBridge 是云原生时代新的计算驱动力,这些数据可以驱动云的计算能力,创造更多业务价值。
7159 0
阿里云 EventBridge 事件驱动架构实践
|
存储 运维 数据可视化
EventBridge 事件总线及 EDA 架构解析(二)| 学习笔记
快速学习 EventBridge 事件总线及 EDA 架构解析。
EventBridge 事件总线及 EDA 架构解析(二)| 学习笔记
|
新零售 数据可视化 中间件
EventBridge 事件总线及 EDA 架构解析(三)| 学习笔记
快速学习 EventBridge 事件总线及 EDA 架构解析。
EventBridge 事件总线及 EDA 架构解析(三)| 学习笔记