eventMesh在runtime启动之前,需要启动roketmq的nameServer和broker,然后启动即可。启动完成后,再启动eventMesh中的example的生产者和消费者。
一、eventmesh-runtime启动的整体过程
eventMesh中的模块eventmesh-runtime的org.apache.eventmesh.runtime.boot.EventMeshStartup启动,基于http/tcp/grpc三种协议做了下面这些事情:
EventMeshHTTPConfiguration初始化 EventMeshTCPConfiguration初始化 EventMeshGrpcConfiguration初始化 EventMeshServer初始化init EventMeshServer 基于http/tcp/grpc启动start 重要 EventMeshServer添加钩子关闭函数
这个过程在运行中是非常重要的。而这些过程中,EventMeshServer初始化init和启动start做的事情比较多,以http协议进行说明:
initThreadPool() 初始化线程池 初始化HttpRetryer、HTTPMetricsServer 初始化ConsumerManager、ProducerManager registerHTTPRequestProcessor 注册处理器 重要
其中初始化线程池和注册处理器很重要
二、初始化相关线程池
batchMsgExecutor 批量消息线程池 sendMsgExecutor 发送消息线程池 remoteMsgExecutor 远程消息线程池 pushMsgExecutor 推送消息线程池 clientManageExecutor 客户端管理线程池 adminExecutor admin线程池 replyMsgExecutor 响应消息线程池
三、http协议下注册相关处理器
发送消息相关处理器:
BatchSendMessageProcessor => requestCode:MSG_BATCH_SEND => 线程池 batchMsgExecutor BatchSendMessageV2Processor=> requestCode:MSG_BATCH_SEND_V2 =>线程池 batchMsgExecutor SendSyncMessageProcessor =>requestCode:MSG_SEND_SYNC => 线程池 sendMsgExecutor SendAsyncMessageProcessor => requestCode:MSG_SEND_ASYNC => 线程池 sendMsgExecutor SendAsyncEventProcessor => 线程池 sendMsgExecutor SendAsyncRemoteEventProcessor => 线程池 remoteMsgExecutor
指标心跳相关处理器:
AdminMetricsProcessor => requestCode:ADMIN_METRICS => 线程池 adminExecutor HeartBeatProcessor => requestCode:HEARTBEAT => 线程池 clientManageExecutor
消费订阅相关处理器:
SubscribeProcessor=> requestCode:SUBSCRIBE => 线程池 clientManageExecutor LocalSubscribeEventProcessor => 线程池 clientManageExecutor RemoteSubscribeEventProcessor=> 线程池 clientManageExecutor UnSubscribeProcessor=> requestCode:UNSUBSCRIBE => 线程池 clientManageExecutor LocalUnSubscribeEventProcessor=> 线程池 clientManageExecutor RemoteUnSubscribeEventProcessor=> 线程池 clientManageExecutor ReplyMessageProcessor=> 线程池 clientManageExecutor ReplyMessageProcessor=> requestCode:REPLY_MESSAGE =>线程池 clientManageExecutor
四、启动
// 执行httpServer启动 @Override public void start() throws Exception { super.start(); metrics.start(); consumerManager.start(); producerManager.start(); httpRetryer.start(); if (eventMeshHttpConfiguration.eventMeshServerRegistryEnable) { this.register(); } logger.info("--------------------------EventMeshHTTPServer started"); }
这里我们可以看到super.start()进行启动,而启动之前,会将对应的业务处理器进行放入。
此时会启动netty服务,这个时候会添加childHandler(new HttpsServerInitializer(sslContext)),也即HttpsServerInitializer。pipeline添加 httpHandler 重要 这里的httpHandler,就可以理解为业务处理器。这里会根据是否使用URI和是否使用链路追踪。分为processHttpRequest(ctx, asyncContext)和 processEventMeshRequest(ctx, asyncContext)。从代码上可以看到processHttpRequest(ctx, asyncContext)为空实现。所以处理eventmesh请求 重要 这里是eventMesh的请求,业务处理的核心部分,此时会走到对应的处理器。这里可以看到org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor这个处理器接口目前主要处理的一些处理器实现主要有:
发送消息处理器 有同步/异步/批量 心跳处理器 订阅处理器 取消订阅处理器 admin指标监控处理器 admin关闭处理器
这里会处理具体的业务逻辑发送和订阅以及心跳。因此此时启动eventmesh-runtime之后,还需要启动生产者和消费者。
五、生产者和消费者启动
启动之后,处理器会根据当前生产者消息处理到发送消息处理器,执行具体的业务处理eventMeshProducer.send(sendMessageContext, new SendCallback()),最终调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl,也是调用发送消息核心实现。
可以看到eventmesh生产消息过程:
创建EventMeshHttpClientConfig配置对象信息 创建EventMeshHttpProducer生产者对象信息 构建EventMeshMessage对象信息 执行消息发布EventMeshHttpProducer 进行消息发布,以http请求为例,此时会有对应的code=>SG_SEND_ASYNC: key=>code 可以看到在请求生产者和消费者之前,会先启动eventMesh的runtime模块,此时会注册SendAsyncMessageProcessor发送异步消息处理器,可以看到对应的code信息 =>RequestCode.MSG_SEND_ASYNC
这个时候发送消息后,会进行消息的放入putMessage,此时会将消息存入到rocketmq的broker中。
然后就是我们的消费消息和订阅消息消费消息的过程:
处理请求消费消息,可以看到SubscribeProcessor订阅进行消费消息或者调用roketmq的消费消息接口直接进行消息消费。可以看到eventMeshHttpConsumer.subscribe(topicList, url)这里会执行订阅操作,进行消息消费。
具体可以跟踪eventmesh-example:
当然除了这里看到的http外,还有基于grpc、tcp协议的。以及webhook的相关操作。同时还有基于spi扩展的相关插件。