聊聊 RocketMQ 网络通讯模块

简介: RocketMQ 的网络通讯模块负责生产者、消费者与 Broker 之间的网络通信。笔者学习 RocketMQ 也是从通讯模块源码开始的,并且从源码里汲取了很多营养。

RocketMQ 的网络通讯模块负责生产者、消费者与 Broker 之间的网络通信。

笔者学习 RocketMQ 也是从通讯模块源码开始的,并且从源码里汲取了很多营养。

1 网络协议

客户端和服务端之间完成数据交互,需要约定数据协议。数据协议如下图:

传输内容分为以下四个部分:

1、消息长度:

​ 总长度,四个字节存储,占用一个 int 类型;

2、序列化类型 & 消息头长度:

​ 占用一个 int 类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;

3、消息头数据

​ 经过序列化后的消息头数据;

4、消息主体数据:

​ 消息主体的二进制字节数据内容。

消息头数据序列化默认是 JSON 格式 ,示例如下:

header格式说明

网络协议设计的原则是便于编解码,这里我们温习下 TCP 粘包拆包的知识点。

TCP 是面向字节流的协议,它会将应用层发送的数据拆分成 TCP 报文段进行传输,发送端和接收端都会维护一个 buffer ,发送的数据首先会存至缓冲区 buffer ,然后通过网络发送给接收端的 buffer 中。

  • 粘包

如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送 。

  • 拆包

如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送。

Netty 通过以下几种方式来解决粘包问题:

1、消息定长:FixedLengthFrameDecoder

发送的消息都是固定长度的,接收方根据固定长度来解析消息,这样可以有效避免粘包和拆包问题。

2、特定分隔符:DelimiterBasedFrameDecoder

在消息的末尾添加特定的分隔符,接收方根据分隔符来切分消息。

3、消息头长度:LenghtFieldBasedFrameDecode

在消息的头部添加表示消息长度的字段,接收方先读取消息头部的长度字段,然后根据长度字段的值来读取消息内容,从而正确地解析出完整的消息。

RocketMQ 的解码器就是使用了 LenghtFieldBasedFrameDecode

2 通讯方式

客户端通信方式支持同步 sync异步 async单向 oneway 三种方式 。

2.1 同步 sync

在同步通信中,客户端发送请求后会一直等待服务器响应,直到接收到响应或者超时。

这意味着:客户端发送线程在发送请求后会被阻塞,直到收到服务器的响应,然后继续执行发送下一个请求。

同步请求的流程:

1、客户端连接服务端,创建 channel ;

2、客户端创建 responseFutrue 对象 ,主要由四个部分组成:响应结果、请求编号、回调函数、CountDownLatch。然后将 responseFutrue 对象加入到本地缓存 响应表 reponseTable 里 。

3、客户端将请求发送到服务端;

4、服务端解析出请求命令 ;

  1. 请求命令中包含命令类型、请求编号,服务端根据命令类型选择处理器 ,执行请求命令;
  2. 服务端将响应数据返回给客户端;
  3. 客户端将响应结果填充到响应表 reponseTable 里,同时因为是同步命令,并调用 countDownLatch 的 countDown 方法 , 这样发送消息线程就不再阻塞(实现同步请求的精髓)。

2.2 异步 async

异步通信中,客户端发送请求后不会等待服务器的响应,而是继续执行后续代码。客户端会注册一个回调函数或者监听器,用于处理服务器响应。当服务器响应返回时,会触发回调函数的执行。

异步请求的流程 :

1、客户端连接服务端,创建 channel ;

2、通过信号量 semaphoreAsync 限制正在进行的异步请求的最大数量 ;

boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

3、客户端创建 responseFutrue 对象 ,主要由四个部分组成:响应结果、请求编号、回调函数、CountDownLatch。然后将 responseFutrue 对象加入到本地缓存 响应表 reponseTable 里 。

4、客户端将请求发送到服务端,客户端异步方法结束 。

5、服务端解析出请求命令 ;

  1. 请求命令中包含命令类型、请求编号,服务端根据命令类型选择处理器 ,执行请求命令;
  2. 服务端将响应数据返回给客户端;

6、通讯框架收到服务端的响应数据后,通过回调线程执行回调函数。

2.3 单向 oneway

单向通信发起调用后,不关心调用结果,不做超时控制,只要请求已经发出,就完成本次调用。

通常用于可以重试,或者定时通知类的场景,调用过程是有可能因为网络问题,机器故障等原因,导致请求失败。业务场景需要能接受这样的异常场景,才可以使用。

需要注意的是,单向通信不能保证请求一定能够成功发送到服务器,也无法保证服务器是否正确地接收到了请求。

oneway 请求的流程 :

1、客户端连接服务端,创建 channel ;

2、通过信号量 semaphoreOneway 限制正在进行的 oneway 请求的最大数量 ;

boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.*MILLISECONDS*);

3、客户端将请求发送到服务端,客户端 oneway 请求方法结束 。

4、服务端解析出请求命令 , 请求命令中包含命令类型、请求编号,服务端根据命令类型选择处理器 ,执行请求命令 , 并不会将响应数据返回给客户端 ;

下表展示了同步异步单向这三种通讯方式的优劣点:

方式 发送TPS 发送结果反馈 可靠性
同步 不丢失
异步 不丢失
单向 最快 可能丢失

3 Reactor多线程设计

RocketMQ 的通信模块采用 Netty 组件作为底层通信库,同样也遵循了 Reactor 多线程模型,同时又在这之上做了一些扩展和优化。

一个 Reactor 主线程 ( eventLoopGroupBoss )责监听 TCP网络连接请求,建立好连接,创建 SocketChannel , 并注册到 selector 上。

RocketMQ 源码会自动根据 OS 的类型选择 NIO 和 Epoll ,也可以通过参数配置 ), 然后监听真正的网络数据。

拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector ),再真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作都交给 defaultEventExecutorGroup 去做。

而业务操作由业务线程池中处理,根据 RemotingCommand 的业务请求编号 requestCode , 从处理器表 processorTable 这个本地缓存中找到对应的处理器 , 然后封装成 task 任务后,提交到对应的业务处理器的线程池执行。

从入口到业务逻辑的几个步骤里,线程池一直在增加,这跟每一步步骤逻辑复杂性相关 ,越复杂,需要的并发通道越宽。

RocketMQ 的线程模型如下所示 :

线程数 线程名 线程具体说明
1 NettyBoss_%d Reactor 主线程
N NettyServerEPOLLSelector%d%d Reactor 线程池
M1 NettyServerCodecThread_%d Worker线程池
M2 RemotingExecutorThread_%d 业务 processor 处理线程池

4 写到最后

通讯模块核心知识点 :

1、网络协议设计原则便于编解码,Netty 的 LenghtFieldBasedFrameDecode 解码器非常容易得解决 TCP 粘包和拆包的问题;

2、网络通讯框架支持同步异步单向这三种通讯方式 ;

3、理解 Reactor 线程模型很关键 。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
机器学习/深度学习 编解码 边缘计算
YOLOv5改进 | 卷积模块 | 用ShuffleNetV2卷积替换Conv【轻量化网络】
本文介绍了如何在YOLOv5中用ShuffleNetV2替换卷积以减少计算量。ShuffleNetV2是一个轻量级网络,采用深度可分离卷积、通道重组和多尺度特征融合技术。文中提供了一个逐步教程,包括ShuffleNetV2模块的代码实现和在YOLOv5配置文件中的添加方法。此外,还分享了完整的代码链接和GFLOPs的比较,显示了GFLOPs的显著减少。该教程适合初学者实践,以提升深度学习目标检测技能。
YOLOv5改进 | 卷积模块 | 用ShuffleNetV2卷积替换Conv【轻量化网络】
|
4月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
域名解析 安全 网络协议
WebKit的网络模块支持的最新网络协议和安全标准
WebKit的网络模块支持的最新网络协议和安全标准
|
2月前
|
JSON API 数据格式
Python网络编程:HTTP请求(requests模块)
在现代编程中,HTTP请求几乎无处不在。无论是数据抓取、API调用还是与远程服务器进行交互,HTTP请求都是不可或缺的一部分。在Python中,requests模块被广泛认为是发送HTTP请求的最简便和强大的工具之一。本文将详细介绍requests模块的功能,并通过一个综合示例展示其应用。
|
4月前
|
数据采集 JSON 数据格式
三:《智慧的网络爬虫》— 网络请求模块(下)
本篇文章讲解了网络请求模块中Requests模块的get请求和post请求,并用十几张图示详细介绍了爬虫工具库与开发者工具的操作与使用;同时本篇文章也列举了多个代码示例如:对搜狗网页的爬取;爬取360翻译(中英文互译程序)并以此介绍了重放请求(通过重放请求来确定反爬参数)以及Cookie与Session实战案例 -- 爬取12306查票
52 9
三:《智慧的网络爬虫》—  网络请求模块(下)
|
4月前
|
数据采集 数据安全/隐私保护 Python
二:《智慧的网络爬虫》— 网络请求模块(上)
网络请求模块就是帮助浏览器(客户端)向服务器发送请求的​。在Python3之前的版本(Python2版本)中所使用的网络请求模块是urllib模块​;在Python3现在的版本中通过urllib模块进行升级 有了现在所使用的requests模块,也就是requests模块是基于urllib模块进行开发的。本篇文章讲解的是urllib模块。
40 2
二:《智慧的网络爬虫》—  网络请求模块(上)
|
3月前
|
网络协议 Python
在Python中,我们使用`socket`模块来进行网络通信。首先,我们需要导入这个模块。
在Python中,我们使用`socket`模块来进行网络通信。首先,我们需要导入这个模块。
|
4月前
|
机器学习/深度学习 自然语言处理 并行计算
YOLOv8改进 | 注意力机制 | 在主干网络中添加MHSA模块【原理+附完整代码】
Transformer中的多头自注意力机制(Multi-Head Self-Attention, MHSA)被用来增强模型捕捉序列数据中复杂关系的能力。该机制通过并行计算多个注意力头,使模型能关注不同位置和子空间的特征,提高了表示多样性。在YOLOv8的改进中,可以将MHSA代码添加到`/ultralytics/ultralytics/nn/modules/conv.py`,以增强网络的表示能力。完整实现和教程可在提供的链接中找到。
|
4月前
|
机器学习/深度学习 计算机视觉
YOLOv8改进 | 卷积模块 | 在主干网络中添加/替换蛇形卷积Dynamic Snake Convolution
本专栏介绍的DSCNet采用蛇形动态卷积,增强对管状结构特征提取,尤其适合血管等弯曲目标。动态卷积核自适应调整,灵感来自蛇形曲线,能灵活捕捉不同尺度细节。论文及官方代码链接已提供,适用于提升目标检测的准确性和鲁棒性。
|
5月前
BOSHIDA AC/DC电源模块在通信与网络设备中的应用研究
BOSHIDA AC/DC电源模块在通信与网络设备中的应用研究
BOSHIDA AC/DC电源模块在通信与网络设备中的应用研究

相关产品

  • 云消息队列 MQ
  • 下一篇
    无影云桌面