Eventbridge是一个云原生事件驱动引擎,基于DDD领域驱动模型实现,其中有相关概念和运行的流程是需要我们了解的。以下内容基于rocketmq官网和自己的学习。
一、相关概念和使用场景
1.eventsource
事件源,用于管理发送到eventbridge的事件,对应CNCF中cloudEvent事件体的source字段。
代表事件发送的源头,通常与微服务系统相对应,也即属于系统层级的一个概念,类似于监控中应用服务级别的概念。
2.eventbus
事件总线,用于存储发送到eventbridge的事件。可以是Local、RocketMQ、Kafka等。
事件生产者发送事件时,需指定事件总线。
3.eventrule
事件规则,当消费者需要订阅事件时,可以通过规则配置过滤和转换信息,将事件推送到指定的目标端。
事件规则包含三部分:
事件过滤、事件转换、事件目标
4.filterPattern
过滤规则,用于在规则中配置过滤出目标端需要的事件,可以对事件总线上的事件进行过滤。
目前支持:指定配置、前缀配置、后缀匹配、除外配置、数值匹配、数组匹配以及复杂的组合逻辑匹配。
5.transform
事件转换,将事件格式转换成目标端需要的数据格式
目前eventbridage支持的事件转换能力:
完整事件:不做转换,直接投递原生cloudEvent
部分事件:通过jsonpath语法从cloudEvent中提取出需要投递到事件目标的内容
常量:事件只起到触发器的作用,投递内容为常量
模板转换器:通过定义目标,灵活地渲染投递处理地事件格式
6.eventTarget
事件目标端,真正地事件消费者,消费者只需要按照自己地业务领域模型设计,提供一个公共地API,eventbridge就会按照Api定义需要地数据格式,将事件安全可靠地推送给target消费者。
二、事件特性
已发生、无期望、天然有序且唯一、具象化
三、RocketMQ Eventbridge是如何工作的
下图来源于Apache RocketMQ的官网:
img
1.确认事件标准:基于CNCF的cloudEvent标准
2.建立事件中心:不同应用属于不同的事件,建立一个事件中心管理不同的事件
3.定义事件格式:明确对应的格式
4.订阅规则:订阅的时候,会存在对应的规则,而这些规则是执行过滤、转换的基础
5.事件总线:建立source和target之间的联系
也即:
source->事件中心->事件总线->filter(event)->transform->target
四、源码学习
根据前面我们提到的概念,在代码Adapter中,我们可以看到对应的控制层的相关的信息:
而这些controller中涉及的操作都是基于Spring WebFlux响应式编程实现。也即Flux和Mono实现返回数据格式对象。
1.Flux和Mono的联系和区别
其中:
Flux
是一个发出(emit)0-N
个元素组成的异步序列的Publisher<T>
,可以被onComplete
信号或者onError
信号所终止。在响应流规范中存在三种给下游消费者调用的方法 onNext
, onComplete
, 和onError
。
Mono
是一个发出(emit)0-1
个元素的Publisher<T>
,可以被onComplete
信号或者onError
信号所终止。
两者的区别在于返回的数据:
一个是集合,一个是对象或者空,同时Flux和Mono之间可以转化。
2.putEvents操作和发送消息
其中EventDataController中会执行放入事件,发送消息的过程,比如putEvents和putHttpEvents这两个方法:
这里涉及到具体的转换器,可以看的作者的思路是基于EventConverterAdapter进行适配,这个过程中借助了map初始化,然后进行匹配请求头中的类型,然后拿到具体的转换器进行转换。
发送消息:
上面的这些controller都涉及到和数据库交互。
3.refresh刷新的场景
这个过程其实和消费是栖息相关的,因为消费者的增加和减少都会影响。类似重平衡中,消费者发生变化的时候,就会发生重平衡,因此这个操作也非常重要。
4.在runtime中会涉及到的相关操作
1)初始化信息
这个是后续操作的基础。
三种都继承了ServiceThread,可以看到最终实现了Runnable接口。
2)同时可以看到EventTargetTrigger的run方法里面:
执行记录放入到sinkTask中,同时提交偏移量,调用rocketmq。
3)EventRuleTransfer相关信息的处理
此时会调用转换引擎,通过openMessaging的Api接口,从而进入到转换引擎TransformEngine,进行EventBridgeTransform、EventBridgeFilterTransform、CloudEventTransform相关操作,也即相关的转换过滤这些操作都是在这里进行的。此时基于Transform中的process方法进行操作:
4)EventBusListener监听事件拉取消息记录放入上下文
5.消息拉取消费
storage模块中,我们可以看到我们想看的类:
可以看的eventbridge基于LitePullConsumerImpl进行poll操作,poll的过程基于本地消息缓存实现。
eventbirdge的学习,我们就到这里。整体上看,作者的思路是非常清晰的。