RocketMQ高手之路系列之十:RocketMQ网络通信原理分析(一)

简介: 通过以上分析可知,RocketMQ实际是在原生Netty之上进行了自己的封装。最后一张图来说明NameServer启动过程中关于Netty启动的部分。在后续的文章中我们再着重分析RocketMQ如何高效使用Netty框架。

引言

我们都知道RocketMQ是高性能的消息中间件,其高性能不仅体现在其优秀的消息吞吐量上,也体现在其基于Netty实现的高性能通信能力上。接下来将通过几篇文章来阐述RocketMQ的通信模块。通过RocketMQ对于通信模块的设计分析,我们在日后需要设计中间件关于通信模块时,其实也可以参考以及借鉴已经成熟的中间件的设计,同时结合自身业务进行改进。

  • 通信架构说明
  • 以NameServer启动为例
  • 消息编解码
  • 总结

一、通信架构说明

RocketMQ的网络通信模块主要实现在remoting模块中,从模块中我们可以得知RocketMQ使用netty进行底层的通信实现,同时在protocol中自定义了通信协议。

image.png

最主要的类关系如下所示:

image.png

(1)RemotingService接口

RemotingService 作为顶层接口定义了三个主要的方法,主要包括启动netty服务、关闭netty服务以及注册RPC钩子处理请求前后的逻辑。

public interface RemotingService {
  //开启服务
    void start();
  //停止服务
    void shutdown();
  //注册RPC钩子
    void registerRPCHook(RPCHook rpcHook);
}

(2)RPCHook 接口

其中RPCHook 接口定义了请求前后进行的逻辑处理,

public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}

(3)服务端与客户端接口

RemotingServerRemotingClient 接口分别继承了RemotingService 接口,并进行了自己的业务扩展。

RemotingServer 接口

public interface RemotingServer extends RemotingService {
  //注册处理请求的处理器, 根据requestCode, 获取处理器,处理请求
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
    int localListenPort();
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
  //单向发送消息,只管发送消息,不管消息发送的结果
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;
}

二、消息编解码

(1)通信协议设计

image.png

(2)编码

remoting模块对于消息进行了自定义协议,将发送的消息以及收到的消息封装为RemotingCommand对象。

 public ByteBuffer encode() {
        // 1> header length size
        int length = 4;
        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;
        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }
        ByteBuffer result = ByteBuffer.allocate(4 + length);
        // length
        result.putInt(length);
        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
        // header data
        result.put(headerData);
        // body data;
        if (this.body != null) {
            result.put(this.body);
        }
        result.flip();
        return result;
    }

(3)解码

 public static RemotingCommand decode(final ByteBuffer byteBuffer) {
    // 获取byteBuffer的总长度
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);
    // 保存header data
        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            // 获取消息体的数据
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;
        return cmd;
    }

三、以NameServer启动为例

在了解remoting模块的核心接口之后,我们接下来看下具体的实现过程。其实在如NameServer启动过程中,它本身就会作为一个Netty的服务端进行启动。我们这里先忽略掉NameServer启动过程中的其他的配置操作,着重对Netty作为服务端启动的流程。大致的启动流程如下所示:

image.png

NameServer实际作为Netty服务端启动底层网络连接的,我们都知道它的作用是作为服务端提供给Broker进行注册以及客户端向其拉取路由信息。

NameServer启动过程中实际是创建了NettyRemotingServer,而NettyRemotingServer是RocketMQ自己开发的网络连接组件,当然它的底层实际是基于Netty的接口实现的ServerBootstrap。下列是start的方法,同样我们只关注Netty服务器的启动。

public static NamesrvController start(final NamesrvController controller) throws Exception {
        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }
    //初始化
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
    //通过Runtime类注册了一个JVM关闭时的shutdown的钩子
        Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                controller.shutdown();
                return null;
            }
        }));
        controller.start();
        return controller;
    }

其中初始化的方法如下所示:

public boolean initialize() {
    //加载配置
        this.kvConfigManager.load();
    //构建Netty服务器
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    //Netty的分作线程池
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    //将工作线程池分配给Netty服务器
        this.registerProcessor();
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }
        return true;
    }

初始化完成之后进行启动,我们可以看到实际启动的是NettyRemotingServer

 public void start() throws Exception {
        this.remotingServer.start();
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

NettyRemotingServer启动过程如下代码所示:

@Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
    //配置启动Netty服务器
        ServerBootstrap childHandler =
          //各种网络配置
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                //设置网络请求处理器,当Netty服务器收到网络请求后,就会有这些Handler进行处理
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                              //编解码
                                new NettyEncoder(),
                                new NettyDecoder(),
                                //空闲连接管理
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                //网络连接管理
                                new NettyConnectManageHandler(),
                                //网络请求处理
                                new NettyServerHandler()
                            );
                    }
                });
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
        try {
          //启动Netty服务器,绑定对应的端口号
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

四、总结

通过以上分析可知,RocketMQ实际是在原生Netty之上进行了自己的封装。最后一张图来说明NameServer启动过程中关于Netty启动的部分。在后续的文章中我们再着重分析RocketMQ如何高效使用Netty框架。

image.png

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
机器学习/深度学习 PyTorch TensorFlow
卷积神经网络深度解析:从基础原理到实战应用的完整指南
蒋星熠Jaxonic,深度学习探索者。深耕TensorFlow与PyTorch,分享框架对比、性能优化与实战经验,助力技术进阶。
|
8月前
|
监控 负载均衡 安全
WebSocket网络编程深度实践:从协议原理到生产级应用
蒋星熠Jaxonic,技术宇宙中的星际旅人,以代码为舟、算法为帆,探索实时通信的无限可能。本文深入解析WebSocket协议原理、工程实践与架构设计,涵盖握手机制、心跳保活、集群部署、安全防护等核心内容,结合代码示例与架构图,助你构建稳定高效的实时应用,在二进制星河中谱写极客诗篇。
WebSocket网络编程深度实践:从协议原理到生产级应用
|
8月前
|
数据采集 监控 网络安全
VMware Cloud Foundation Operations for Networks 9.0.1.0 发布 - 云网络监控与分析
VMware Cloud Foundation Operations for Networks 9.0.1.0 发布 - 云网络监控与分析
475 3
VMware Cloud Foundation Operations for Networks 9.0.1.0 发布 - 云网络监控与分析
|
8月前
|
机器学习/深度学习 大数据 关系型数据库
基于python大数据的青少年网络使用情况分析及预测系统
本研究基于Python大数据技术,构建青少年网络行为分析系统,旨在破解现有防沉迷模式下用户画像模糊、预警滞后等难题。通过整合多平台亿级数据,运用机器学习实现精准行为预测与实时干预,推动数字治理向“数据驱动”转型,为家庭、学校及政府提供科学决策支持,助力青少年健康上网。
|
9月前
|
机器学习/深度学习 人工智能 算法
卷积神经网络深度解析:从基础原理到实战应用的完整指南
蒋星熠Jaxonic带你深入卷积神经网络(CNN)核心技术,从生物启发到数学原理,详解ResNet、注意力机制与模型优化,探索视觉智能的演进之路。
768 11
|
9月前
|
机器学习/深度学习 算法 搜索推荐
从零开始构建图注意力网络:GAT算法原理与数值实现详解
本文详细解析了图注意力网络(GAT)的算法原理和实现过程。GAT通过引入注意力机制解决了图卷积网络(GCN)中所有邻居节点贡献相等的局限性,让模型能够自动学习不同邻居的重要性权重。
1554 0
从零开始构建图注意力网络:GAT算法原理与数值实现详解
|
9月前
|
安全 测试技术 虚拟化
VMware-三种网络模式原理
本文介绍了虚拟机三种常见网络模式(桥接模式、NAT模式、仅主机模式)的工作原理与适用场景。桥接模式让虚拟机如同独立设备接入局域网;NAT模式共享主机IP,适合大多数WiFi环境;仅主机模式则构建封闭的内部网络,适用于测试环境。内容简明易懂,便于理解不同模式的优缺点与应用场景。
1254 0
|
11月前
|
机器学习/深度学习 人工智能 PyTorch
零基础入门CNN:聚AI卷积神经网络核心原理与工业级实战指南
卷积神经网络(CNN)通过局部感知和权值共享两大特性,成为计算机视觉的核心技术。本文详解CNN的卷积操作、架构设计、超参数调优及感受野计算,结合代码示例展示其在图像分类、目标检测等领域的应用价值。
588 7
|
10月前
|
数据采集 存储 数据可视化
Python网络爬虫在环境保护中的应用:污染源监测数据抓取与分析
在环保领域,数据是决策基础,但分散在多个平台,获取困难。Python网络爬虫技术灵活高效,可自动化抓取空气质量、水质、污染源等数据,实现多平台整合、实时更新、结构化存储与异常预警。本文详解爬虫实战应用,涵盖技术选型、代码实现、反爬策略与数据分析,助力环保数据高效利用。
511 0
|
机器学习/深度学习 算法 测试技术
图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析
本文探讨了基于图的重排序方法在信息检索领域的应用与前景。传统两阶段检索架构中,初始检索速度快但结果可能含噪声,重排序阶段通过强大语言模型提升精度,但仍面临复杂需求挑战
461 0
图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析