RocketMQ底层通信机制

简介: 分布式系统各个角色间的通信效率很关键,通信效率的高低直接影响系统性能,基于Socket实现一个高效的Tcp通信协议是个很有挑战的事情,本节说明RocketMQ是如何解决这个问题的 1.1.1 Remoting模块RocketMQ的通信相关代码在Remoting模块里,先来看看主要类结构。

分布式系统各个角色间的通信效率很关键,通信效率的高低直接影响系统性能,基于Socket实现一个高效的Tcp通信协议是个很有挑战的事情,本节说明RocketMQ是如何解决这个问题的
1.1.1 Remoting模块
RocketMQ的通信相关代码在Remoting模块里,先来看看主要类结构。
1_1

RemotingService为最上层接口,定义了三个方法:
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
RemotingClient,RemotingServer继承RemotingService接口, 并增加了自己特有的方法。
代码清单1-1 RemotingClient主要函数定义
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,final ExecutorService executor);
RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis);
void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback);
void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis);
void updateNameServerAddressList(final List addrs);

然后看看具体的实现类,NettyRemotingClient和NettyRemotingServer分别实现了RemotingClient和RemotingServer, 而且都继承了NettyRemotingAbstract类.
通过上面的封装,RocketMQ各个模块间的通信,可以通过发送统一格式的自定义消息(RemotingCommand)来完成的,各个模块间的通信实现简洁明了。
比如NameServer模块中,NameServerController有个remotingServer变量,NameServer在启动时初始化好各个变量,然后启动remotingServer即可,剩下NameServer要做的是专心实现好处理RemotingCommand的逻辑。
代码清单1-2 NameServer处理主流程代码
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {

if (log.isDebugEnabled()) {
    log.debug("receive request, {} {} {}",
        request.getCode(),
        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
        request);
}
switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        Version brokerVersion = MQVersion.value2Version(request.getVersion());
        if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
            return this.registerBrokerWithFilterServer(ctx, request);
        } else {
            return this.registerBroker(ctx, request);
        }
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    default:
        break;
}
return null;

}
在Consumer的源码中,获取消息的底层的通信部分也是发送一个RemotingComand 请求,返回的response也是个RemotingCommand类型。
代码清单1-3 Consumer请求消息底层实现代码
private PullResult pullMessageSync(//

final String addr, // 1
final RemotingCommand request, // 2
final long timeoutMillis// 3

) throws RemotingException, InterruptedException, MQBrokerException {

RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processPullResponse(response);

}
从源码中可以看出,RocketMQ中复杂的通信过程,被RemotingCommand统一起来,大部分的逻辑都是通过发送Command,接受并处理Command完成。
1.1.2 协议设计和编解码
RocketMQ自己定义了一个通信协议,使得模块间传输的二进制消息和有意义的内容之间互相转换。协议格式如图4-2所示。
1_2

图1-2 RocketMQ的通信协议
(1)第一部分是大端4个字节整数,值等于第二,三,四部分长度总和 

(2)第二部分是大端4个字节整数,值等于第三部分的长度 

(3)第三部分是通过json 序列化的数据 

(4)第四部分是通过应用自定义二进制序列化的数据
消息的解码过程在RomotingCommand的decode函数里。
代码清单1-4 消息解码函数
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
    bodyData = new byte[bodyLength];
    byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;

}
对应的消息编码过程在RemotingCommand的encode函数中。
代码清单1-5 消息编码函数
public ByteBuffer encode() {

// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
    length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
    result.put(this.body);
}
result.flip();
return result;

}

1.1.3 Netty库
RocketMQ是基于Netty库来完成RemotingServer和RemotingClient具体的通信实现的,Netty是个事件驱动的网络编程框架,它屏蔽了Java Socket,Nio等复杂细节,用户只需用好Netty,就可以实现一个网络编程专家+并发编程专家水平的Server、Client网络程序。应用Netty有一定的门槛,需要了解它的EventLoopGroup,Channel,Handler模型以及各种具体的配置。RocketMQ利用Netty实现的通信类是NettyRemotingServer和NettyRemotingClient,用户也可以参考这两个类的实现来学习使用Netty。

推荐阅读:
RocketMQ

云栖社区官方出品

RocketMQ实战与原理解析
作者:杨开元
定价:59.00元
•RocketMQ由阿里开源,Apache开源项目,经受多年流量峰值考验,在多个性能指标上远超同类产品
•作者是阿里资深数据专家,有多年RocketMQ使用经验,深入研究RocketMQ源代码,写作前与RocketMQ官方团队有深入沟通
•云栖社区官方出品,得到RocketMQ官方研发团队以及业界的多位专家的肯定和推荐

阅读原文:http://product.dangdang.com/25290633.html

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 存储 监控
|
消息中间件 存储 负载均衡
一文读懂RocketMQ的高可用机制——消息发送高可用
一文读懂RocketMQ的高可用机制——消息发送高可用
298 1
|
4月前
|
消息中间件 存储 运维
|
4月前
|
消息中间件 负载均衡 Java
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息消费长轮训机制体系的原理分析
77 0
|
2月前
|
消息中间件 JavaScript RocketMQ
消息队列 MQ使用问题之过期删除机制的触发条件是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之过期删除机制的触发条件是什么
|
26天前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
38 0
|
26天前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
38 0
|
26天前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
30 0
|
4月前
|
消息中间件 存储 安全
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
【深入浅出RocketMQ原理及实战】「底层原理挖掘系列」透彻剖析贯穿RocketMQ的消息顺序消费和并发消费机制体系的原理分析
73 0
|
3月前
|
消息中间件 Apache RocketMQ
消息队列 MQ产品使用合集之是否提供机制检测消费的状态
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
下一篇
DDNS