RocketMQ高手之路系列之二:RocketMQ之消息通信

简介: 本文主要从源码角度分析RocketMQ的底层通信机制以及RPC调用的过程。对于RocketMQ通信机制的深入理解,是我们分析和领会整个RocketMQ系统消息流转流程的基石。消息消息底层如何流转总结

引言

本文主要从源码角度分析RocketMQ的底层通信机制以及RPC调用的过程。对于RocketMQ通信机制的深入理解,是我们分析和领会整个RocketMQ系统消息流转流程的基石。

  • 消息
  • 消息底层如何流转
  • 总结

一、消息

我们可以设想一下,如果自己是RocketMQ的设计者,我们该如何设计一个消息系统。大家都知道RocketMQ是消息中间件,那么首先要解决的问题就是消息本身该如何设计。因为整个系统中它是传递的对象,是数据的载体。那么对于消息本身来说,我们需要定义消息的格式,这样客户端与服务端之间可以遵循定义好的消息格式来进行通信。在RocketMQ体系中,通过RemotingCommand对象来进行交互,,它对数据进行了封装。

(1)消息协议设计以及编解码

我们看下RemotingCommand中的部分属性如下所示:

private int code;   //操作码
private LanguageCode language = LanguageCode.JAVA;   //实现语言
private int version = 0;   //程序版本
private int opaque = requestId.getAndIncrement();   //reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应
private int flag = 0;    //区分是普通RPC还是onewayRPC的标志
private String remark;    //传输自定义文本信息
private HashMap<String, String> extFields;    //自定义扩展信息
private transient CommandCustomHeader customHeader;

我们再看下具体的消息的格式是怎样的,如下图所示:

image.png

消息长度:总长度,四个字节存储,占用一个int类型;


序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;


消息头数据:经过序列化后的消息头数据;


消息主体数据:消息主体的二进制字节数据内容。


所以消息本身结构以及消息如何传递,都是我们需要了解的

image.png

图片来源于网络


我们看下remoting模块中的protocal文件夹下的相关定义类,如下所示:

image.png

对于消息还需要进行编码以及解码操作来提高消息传递的效率,所以RemotingCommand 中还提供了编码、解码的方法。RemotingCommand 的encode()源码如下所示:

 public ByteBuffer encode() {
        // 1> header length size
        int length = 4; //存储头部长度
        // 2> header data length
        byte[] headerData = this.headerEncode(); //报文头部数据
        length += headerData.length;
        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }
        ByteBuffer result = ByteBuffer.allocate(4 + length);  //分配字节缓冲区
        // length
        result.putInt(length);
        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
        // header data
        result.put(headerData);
        // body data;
        if (this.body != null) {
            result.put(this.body);
        }
        result.flip();  //将缓冲区翻转,用于将ByteBuffer放到网络通道中进行传输
        return result;
    }

我们再看下消息的解码操作,源码中的实现如下所示:

//解码操作
public static RemotingCommand decode(final byte[] array) {
        ByteBuffer byteBuffer = ByteBuffer.wrap(array);
        return decode(byteBuffer);
    }
//
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        int headerLength = getHeaderLength(oriHeaderLen);
        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;
        return cmd;
    }

(2)消息传输方式

RocketMQ系统中,消息的通信方式主要有三种:

1、可靠的同步传输;

2、可靠的异步传输;

3、单向传输;

二、消息如何流转

我们知道RocketMQ底层的通信是通过Netty实现。其中NettyRemotingClient与NettyRemotingServer是RemotingClient和RemotingServer这两个接口的实现类,也是比较重要的实现类。NettyRemotingClient与NettyRemotingServer也同时继承了NettyRemotingAbstract抽象类,该类抽象了invokeSync、invokeOneway等公用的方法实现。


(1)客户端请求发送

首先我们来看下 NettyRemotingClient中的重要源码分析,如下所示:

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
...
}

我们可以看出客户端继承了NettyRemotingAbstract抽象类同时实现了RemotingClient接口。其中比较重要的方法包括同步调用、异步调用以及单向调用。我么通过异步调用的方式来举例,NettyRemotingClient 根据地址信息获取或者创建channel,接着会进行invokeAsyncImpl方法的调用,此时客户端会将数据流转给公共的抽象类NettyRemotingAbstract 进行统一处理,在此类中完成真正的请求发送动作。发送消息的源码如下所示:

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        long beginStartTime = System.currentTimeMillis();
        //请求的ID
        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                once.release();
                throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
            }
      //构建ResponseFuture 
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
            //将ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            try {
              //使用Netty的channel进行请求数据的发送
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                  //消息发送后执行动作
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                          //如果消息发送成功,则直接将属性置为true,同时返回
                            responseFuture.setSendRequestOK(true);
                            return;
                        }
                        //如果失败
                        requestFail(opaque);
                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
              //异常处理过程
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

此处有需要留意的部分,如下:

protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

opaque 表示请求码,用来标识不同的请求。这个请求码会与ResponseFuture映射在一起。

(2)服务端响应流程

服务端代码如下,继承了NettyRemotingAbstract同时实现了RemotingServer

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
...
}

在该类中的start方法如下所示,它的主要作用是启动Netty服务器

@Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                  //向管道中添加数据处理逻辑
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                new NettyServerHandler()
                            );
                    }
                });
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }

三、总结

本文主要介绍了RocketMQ在消息传递过程中底层通信的过程,同时阐述了消息的格式以及相应的编解码的过程。下篇文章会和大家来一起继续拆解RocketMQ上层的流程。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
797 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 存储 数据可视化
【RocketMq-生产者】消息发送者参数详解
首先注意本次讨论的RokcetMq源码版本为 4.9.4,距离5.0发布 的没有多久。 这一节针对RocketMq的生产者请求发送的部分细节进行阐述,主要包含了下面的内容:DefaultMQProducer 为生产者默认对象,这个对象继承自 ClientConfig,里面包含了请求者的通用配置,所以可以拆分为两个部分进行理解,第一部分为ClientConfig,第二部分为DefaultMQProducer。
713 0
|
消息中间件 Java RocketMQ
【消息中间件】默认RocketMQ消息发送者是如何启动的?
上一篇文章,主要介绍了RocketMQ消息发送-请求与响应,了解了消息发送的请求参数和响应结果,今天我们主要来学习默认消息发送者的源码,看看消息发送主要是做了那哪些事情。
|
消息中间件 负载均衡 算法
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(上)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 存储 负载均衡
【消息中间件】默认的RocketMQ消息消费者是如何启动的?(下)
在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67805 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2790 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
727 1
5张图带你理解 RocketMQ 顺序消息实现机制
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
271 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
437 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息