eventMesh源码学习

简介: eventMesh源码学习

eventMesh在runtime启动之前,需要启动roketmq的nameServer和broker,然后启动即可。启动完成后,再启动eventMesh中的example的生产者和消费者。

一、eventmesh-runtime启动的整体过程

eventMesh中的模块eventmesh-runtime的org.apache.eventmesh.runtime.boot.EventMeshStartup启动,基于http/tcp/grpc三种协议做了下面这些事情:

EventMeshHTTPConfiguration初始化
EventMeshTCPConfiguration初始化
EventMeshGrpcConfiguration初始化
EventMeshServer初始化init
EventMeshServer 基于http/tcp/grpc启动start 重要
EventMeshServer添加钩子关闭函数

这个过程在运行中是非常重要的。而这些过程中,EventMeshServer初始化init和启动start做的事情比较多,以http协议进行说明:

initThreadPool() 初始化线程池
初始化HttpRetryer、HTTPMetricsServer
初始化ConsumerManager、ProducerManager
registerHTTPRequestProcessor 注册处理器 重要

其中初始化线程池和注册处理器很重要

二、初始化相关线程池

batchMsgExecutor 批量消息线程池
sendMsgExecutor  发送消息线程池
remoteMsgExecutor 远程消息线程池
pushMsgExecutor  推送消息线程池
clientManageExecutor 客户端管理线程池
adminExecutor admin线程池
replyMsgExecutor 响应消息线程池

三、http协议下注册相关处理器

发送消息相关处理器:

BatchSendMessageProcessor => requestCode:MSG_BATCH_SEND => 线程池 batchMsgExecutor
BatchSendMessageV2Processor=> requestCode:MSG_BATCH_SEND_V2 =>线程池 batchMsgExecutor
SendSyncMessageProcessor =>requestCode:MSG_SEND_SYNC => 线程池 sendMsgExecutor
SendAsyncMessageProcessor => requestCode:MSG_SEND_ASYNC => 线程池 sendMsgExecutor
SendAsyncEventProcessor => 线程池 sendMsgExecutor
SendAsyncRemoteEventProcessor => 线程池 remoteMsgExecutor

指标心跳相关处理器:

AdminMetricsProcessor => requestCode:ADMIN_METRICS => 线程池 adminExecutor
HeartBeatProcessor =>  requestCode:HEARTBEAT => 线程池 clientManageExecutor

消费订阅相关处理器:

SubscribeProcessor=> requestCode:SUBSCRIBE => 线程池 clientManageExecutor
LocalSubscribeEventProcessor => 线程池 clientManageExecutor
RemoteSubscribeEventProcessor=> 线程池 clientManageExecutor
UnSubscribeProcessor=> requestCode:UNSUBSCRIBE => 线程池 clientManageExecutor
LocalUnSubscribeEventProcessor=> 线程池 clientManageExecutor
RemoteUnSubscribeEventProcessor=> 线程池 clientManageExecutor
ReplyMessageProcessor=> 线程池 clientManageExecutor
ReplyMessageProcessor=> requestCode:REPLY_MESSAGE =>线程池 clientManageExecutor

四、启动

// 执行httpServer启动
    @Override
    public void start() throws Exception {
        super.start();
        metrics.start();
        consumerManager.start();
        producerManager.start();
        httpRetryer.start();
        if (eventMeshHttpConfiguration.eventMeshServerRegistryEnable) {
            this.register();
        }
        logger.info("--------------------------EventMeshHTTPServer started");
    }

这里我们可以看到super.start()进行启动,而启动之前,会将对应的业务处理器进行放入。

此时会启动netty服务,这个时候会添加childHandler(new HttpsServerInitializer(sslContext)),也即HttpsServerInitializer。pipeline添加 httpHandler 重要 这里的httpHandler,就可以理解为业务处理器。这里会根据是否使用URI和是否使用链路追踪。分为processHttpRequest(ctx, asyncContext)和 processEventMeshRequest(ctx, asyncContext)。从代码上可以看到processHttpRequest(ctx, asyncContext)为空实现。所以处理eventmesh请求 重要 这里是eventMesh的请求,业务处理的核心部分,此时会走到对应的处理器。这里可以看到org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor这个处理器接口目前主要处理的一些处理器实现主要有:

发送消息处理器 有同步/异步/批量
心跳处理器
订阅处理器
取消订阅处理器
admin指标监控处理器
admin关闭处理器

这里会处理具体的业务逻辑发送和订阅以及心跳。因此此时启动eventmesh-runtime之后,还需要启动生产者和消费者。

五、生产者和消费者启动

启动之后,处理器会根据当前生产者消息处理到发送消息处理器,执行具体的业务处理eventMeshProducer.send(sendMessageContext, new SendCallback()),最终调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl,也是调用发送消息核心实现。

可以看到eventmesh生产消息过程:

创建EventMeshHttpClientConfig配置对象信息
创建EventMeshHttpProducer生产者对象信息
构建EventMeshMessage对象信息
执行消息发布EventMeshHttpProducer
进行消息发布,以http请求为例,此时会有对应的code=>SG_SEND_ASYNC: key=>code
可以看到在请求生产者和消费者之前,会先启动eventMesh的runtime模块,此时会注册SendAsyncMessageProcessor发送异步消息处理器,可以看到对应的code信息 =>RequestCode.MSG_SEND_ASYNC

这个时候发送消息后,会进行消息的放入putMessage,此时会将消息存入到rocketmq的broker中。

然后就是我们的消费消息和订阅消息消费消息的过程:

处理请求消费消息,可以看到SubscribeProcessor订阅进行消费消息或者调用roketmq的消费消息接口直接进行消息消费。可以看到eventMeshHttpConsumer.subscribe(topicList, url)这里会执行订阅操作,进行消息消费。

具体可以跟踪eventmesh-example:


当然除了这里看到的http外,还有基于grpc、tcp协议的。以及webhook的相关操作。同时还有基于spi扩展的相关插件。


目录
相关文章
|
4月前
|
消息中间件 Java 测试技术
Goim框架的源码学习笔记
Goim框架的源码学习笔记
|
8月前
|
NoSQL Java 应用服务中间件
关于阅读源码
【1月更文挑战第12天】关于阅读源码
|
程序员 开发工具 C++
|
安全 Java
ReentranLock源码学习
线程的三大特性:原子性、可见性、有序性。也就是说满足这个三个特性的操作都是可以保证安全的,如Atomic包、volatile、通过happensBefore原则可以进行线程的安全的判断,这个依据通常是为了避免jvm指令重排。比如通常我们知道的配置信息,如果有多个线程去进行配置信息的修改,则需要进行上锁。或者多个线程修改一个变量时,此时就需要进行上锁了,或者读写分离时,可以考虑ReentrantReadWriteLock等。其本质是解决并行中的问题,将并行转成串行问题进行解决。那怎么上锁才有用呢?锁的状态大部分情况下是互斥的。当然也有特例:ReentrantReadWriteLock的读读是不会
103 0
ReentranLock源码学习
|
设计模式 分布式计算 资源调度
如何阅读源码
如何阅读源码
228 0
|
存储 人工智能 安全
C++学习必备——文章中含有源码
C++学习必备——文章中含有源码
127 0
C++学习必备——文章中含有源码
|
算法 Java 中间件
阅读优秀项目源码很重要,分享一个读源码的方法,小白都能学会
作为一个程序员,经常需要读一些开源项目的源码。同时呢,读源码对我们也有很多好处: 1.提升自己 2.修复 Bug 3.增加新功能
阅读优秀项目源码很重要,分享一个读源码的方法,小白都能学会
|
算法 NoSQL 前端开发
为什么要看源码、如何看源码,高手进阶必看
为什么要看源码、如何看源码,高手进阶必看
289 0
openFrameworks下的肤色检测源码
openFrameworks下的肤色检测源码
182 0
|
分布式计算 搜索推荐 前端开发
学会阅读源码后,我觉得自己better了
学会阅读源码后,我觉得自己better了
197 0