RocketMQ 发送消息

简介: 前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息发送部分的实现细节。

引言

前面我们已经简单地介绍了 RocketMQ 的整体设计思路,本文着重其中消息发送部分的实现细节,更多关于 RocketMQ 的文章均收录于<RocketMQ系列文章>;

消息发送

RocketMQ支持3种消息发送方式:同步(sync)、异步(async)、单向(oneway)。

  • 同步:发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。
  • 异步:发送者向MQ执行发送消息API时,指定消息发送成功后的回调函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
  • 单向:消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。

消息内容

private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;

Message的基础属性主要包括消息所属主题topic,消息Flag,扩展属性,消息体,事务ID。

消息Flag的定义如下,可以看出其主要和事务支持有关,关于RocketMQ的事务机制,我们后面会介绍:

public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;

Message 扩展属性主要包含下面几个。

  • tag:消息TAG,用于消息过滤。
  • keys:Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息。
  • waitStoreMsgOK:消息发送时是否等消息存储完成后再返回。
  • delayTimeLevel:消息延迟级别,用于定时消息或消息重试。

这些扩展属性存储在Message的properties中。

发送流程

消息发送流程主要的步骤:验证消息、查找路由、消息发送(包含异常处理机制)。

消息验证

消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体的规范要求是主题名称、消息体不能为空、消息长度不能等于0且默认不能超过允许发送消息的最大长度4M(maxMessageSize=102410244)。

查找路由

消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。

如果生产者中缓存了topic的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存或没有包含消息队列,则向NameServer查询该topic的路由信息。如果最终未找到路由信息,则抛出异常:无法找到主题相关路由信息异常。

这里就有一个问题,如果整个消息队列服务刚运行,各个topic的路由信息是如何创建出来的?一般来说会有两个方案:

  1. 在生产者发送消息之前,就人工创建好各个topic的路由信息,这样做的好处是,可以根据该topic消息的实际需求,分配合适的broker数量和消息队列数量。一般来说,生产环境的服务都推荐以这种方式进行。
  2. 可以配置各个Broker,打开其自动创建topic的功能(BrokerConfig#autoCreateTopicEnable),这样就会在发送第一个消息时,动态的创建该topic的路由信息。

自动创建topic路由的过程如下:

  1. Broker如果开启了自动创建topic功能,则创建默认主题路由信息,并通过心跳包告知NameServer
  2. Producer查询本地路由缓存,未找到新topic的路由信息
  3. Producer查询NameServer,未找到新topic的路由信息
  4. Producer查询NameServer,找到默认主题路由信息
  5. Producer根据默认主题路由信息,将消息发送到默认主题的其中一个Broker
  6. 收到默认主题消息的Broker,根据消息的原始topic,创建相应的路由信息,并通过心跳包告知NameServer
  7. Producer下次发送该topic的消息时

    • 如果已经存在该消息的路由(定时拉取):则直接根据路由发送消息
    • 如果该消息的路由还没来得及同步:则继续发送到默认主题

从上面的自动创建topic流程中,我们会发现,如果新创建的topic信息没有来得及同步时,再次发送消息,可能会在其他 Broker 也创建该topic的队列,但是如果只是发送了一条该topic的消息后就等待一段时间,等路由信息同步完成后,再发送就会出现整个消息队列集群中,只有一个broker负责该topic,这样就对并发性产生较大的影响,试想一下,你的消息队列本来有10个Broker节点,他们都配置成自动创建Topic,然后10个Producer分别发送不同的topic消息,但是它们都只发送了一条消息就休息了一段时间,这10个Producer根据路由选择策略,碰巧都选择了同一个Broker,那么最后这个消息队列集群,就只有一个Broker在工作,其负担了所有topic的任务。

上面的例子,虽然有些极端,但是这也正是生产环境中不使用自动创建Topic策略的原因。除了这种极端情况,可能上例中的每个Producer都在对应topic路由信息同步前,将消息发送到了多个Broker,这些Broker都会创建相应topic,那么每个Topic都会由多个Broker负责,这样整个服务的并发能力就会得到充分的利用。

路由选择

根据路由信息选择消息队列,返回的消息队列按照broker、序号排序。举例说明,如果topicA在broker-a,broker-b上分别创建了4个队列,那么返回的消息队列如下:

[
  {
    "brokerName":"broker-a",
    "queueId": 0
  },
  {
    "brokerName":"broker-a",
    "queueId": 1
  },
  {
    "brokerName":"broker-a",
    "queueId": 2
  },
  {
    "brokerName":"broker-a",
    "queueId": 3
  },
  {
    "brokerName":"broker-b",
    "queueId": 0
  },
  {
    "brokerName":"broker-b",
    "queueId": 1
  },
  {
    "brokerName":"broker-b",
    "queueId": 2
  },
  {
    "brokerName":"broker-b",
    "queueId": 3
  }
]

首先消息发送端采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送请求后,执行回调之前进行重试,由retryTimesWhenSendAsyncFailed指定。接下来就是循环执行:选择消息队列、发送消息,发送成功则返回,收到异常则重试。

如果在一次消息发送的过程中消息发送失败了,那么在下次重试的过程中,会排除掉上次失败的Broker。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) { //空表示第一次发送
        return selectOneMessageQueue(); // 循环选择一个队列
    } else {
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {// 遍历所有队列
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {// 排除上一次的broker
                return mq;
            }
        }
        return selectOneMessageQueue(); // 如果只有一个broker,则继续循环选择套路
    }
}

public MessageQueue selectOneMessageQueue() { // 循环选择
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

从上面的实现中,可以看出在一次消息发送的过程中,可以通过重试机制绕开失败过的Broker,但是如果是发送多个消息,上述机制就无法绕开会失败的Broker。

我们前面已经说过,如果Broker宕机,可能会花费很长时间才能同步到各个Producer,那么怎么在Broker宕机的信息同步到Producer之前,绕开它而将消息发送到别的正常Broker上呢,这就不得不提RocketMQ的另一个容错机制————故障延迟机制。

首先我们需要知道RocketMQ在哪里存储失败的Broker,这些信息都存在LatencyFaultToleranceImpl中,其中有一个ConcurrentHashMap<String, FaultItem> faultItemTable存储了所有失败节点的信息。FaultItem中存储了如下内容:

// FaultItem fields
private final String name;
private volatile long currentLatency; //请求该节点的耗时
private volatile long startTimestamp; //预估下次可用的时间点

public boolean isAvailable() { // 当前时间大于等于预估可用点时,则认为可用
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

FaultItem的信息,在每次消息发送成功,和消息发送失败时,进行更新。

  • 如果消息发送成功,currentLatency赋值为本次请求的实际耗时
  • 消息发送失败,currentLatency赋值为30s

startTimestamp是根据currentLatency进行设置的,RocketMQ将currentLatency分成了不同的档位,不同档位的currentLatency会对应不同的notAvailableDuration,然后:
startTimestamp=System.currentTimeMillis() + notAvailableDuration
currentLatency与notAvailableDuration的对应关系如下图:

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
// 按从大到小的顺序,找到 currentLatency 所在的区间,然后输出该区间对应的不可用时长
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}

由此,我们可以算出如果消息发送失败,那么RocketMQ正常来说会禁用该Broker十分钟。

知道了RocketMQ如何存储失败节点,在让我们来看看它是如何利用该信息,达到避开失败节点的效果。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, /*通常情况下,指上次请求失败时用到的节点*/final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            // 外层轮询,下次请求时会选择下一个队列
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                // index用于内层轮询,从而排除不可用的节点
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 验证可用性,latencyFaultTolerance存储了各个Broker发送消息的耗时,已经预估的下次可用时间点
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    // 如果是第一次请求,并且可用,则直接返回
                    // 如果是重试请求,则只使用brokerName等于lastBrokerName相同的,这点大家肯定有疑问:为啥要使用上次失败的,其实这里的lastBrokerName不单单指上次发送失败的节点,
                    // 它还能蕴含推荐节点的信息,本函数的后半段中会看到如何将推荐节点的信息传递到lastBrokerName
                    // 但是我觉得这样写的一个弊端是:当前可用节点,如果和上次失败时所用的节点不一致时,就会被排除掉,这也会影响效率
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }
            // 走到这一步,代表所有节点都不可用(首次请求)或者上一次失败时用到的节点仍然不可用(重试请求)
            // 随后,将所有不可用的节点,按照潜在可用性(当前可用与否>上次使用该节点时的调用耗时>预估的下次可使用时间),进行了排序,然后选择最优的结果
            // 这里不用担心,连续重试时pickOneAtLeast每次都选择了相同节点,因为其内部也是用了轮训机制,会从最优->次优的顺序给出下一次推荐的节点
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //getQueueIdByBroker这个函数名也是惊到我了,完全和其功能不匹配,本行代码意在得到推荐节点是否仍然存在可写队列,如果存在,得出队列数
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            // 我们得到的推荐节点存在可写队列
            if (writeQueueNums > 0) {
                // 至此我们已经拿到了一个推荐节点,但是接下来代码的作者并没有简单地根据推荐节点来寻找队列,而是靠外层轮训找了下一个队列
                // 如果再次重试过程中没有发生路由信息更新的话,该队列应该仍然是不可用的,并且很可能仍是最初失败的Broker节点的队列,
                // 为什么这么说:假如消息队列为[BrokerA1,BrokerA2,BrokerA3,BrokerA4,BrokerB1,BrokerB2],只有当上一次轮训到BrokerA4时,这里才会跳过BrokerA而得到BrokerB的队列
                // 而且,可能下次更新路由表时,该信息可能就会成为过期数据而被GC,我猜作者是觉得反正这个数据快没用了,不如把它替换成刚才得到的推荐节点信息,这样可以少new一个对象,还能增加下次查找时的效率
                // 之所以说它能增加效率,是因为这个过程实质上是将一个很可能宕机的节点队列换成了最可能可用的节点信息,那么下次再轮训到这个节点时,实际上就跳过了寻找推荐节点的过程
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    // 把得到的队列的BrokerName改成我们前面得到的推荐节点,这样如果再请求失败并且重试的时候,lastBrokerName其实存储的就是推荐节点的信息了,下次再执行本函数时就会优先使用推荐节点的其他队列
                    mq.setBrokerName(notBestBroker);
                    // 重新计算其队列编号,因为得到的这个队列数可能和推荐节点的队列数不一致,如果用了错误的队列序号,消息发送到Broker那时,肯定会报错
                    // 因此,这里基于外层轮询使用的index,对本次使用的队列编号进行了计算,我觉得这里最终达到的是随机选择的效果
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else { // 推荐节点不存在可写队列了,说明该节点可能已经宕机,并且NameServer已经删除了其路由信息,并且已经同步过来了
                // 这时候可以将该节点从延迟统计表中删除,不在考虑该节点
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        // 如果拿到的推荐节点已经不存在可写队列了,就随机选一个队列
        return tpInfo.selectOneMessageQueue();
    }
    // 默认策略
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

在我看来,RocketMQ的队列选择算法很恐怖,我不确定是特意设计成这样的,还是历史发展出来的奇怪产物。Github仓库中,也有很多人提issue问这个函数的设计深意。而且,其他人的文章中对该函数的介绍都是一笔带过。我这里,以我的理解对该算法进行了详细的分析,可能有些地方理解的不对,希望大家在留言中指出。

简单的说,上述算法按照如下流程工作:

  1. 轮训所有队列,通过LatencyFaultTolerance找到可用队列
  2. 如果未找到任何可用队列,通过LatencyFaultTolerance存储的信息,按照三个纬度的可用性排序(当前可用与否>上次使用该节点时的调用耗时>预估的下次可使用时间),选出最可能可用的队列
  3. 如果上述两个步骤都没有选出队列,则按照最简单的轮训找到下一个队列

消息发送方式

  1. 根据MessageQueue获取Broker的网络地址。如果找不到Broker信息,则抛出MQClientException,提示Broker不存在。
  2. 为消息分配全局唯一ID,如果消息体默认超过4K(compressMsgBodyOverHowMuch), 会对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.COMPRESSED_FLAG。如果是事务Prepared消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。
  3. 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。
  4. 构建消息发送请求包。主要包含如下重要信息:生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID(队列序号)、消息系统标记(MessageSysFlag)、消息发送时间、消息标记(RocketMQ对消息中的flag不做任何处理,供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等。
  5. 根据消息发送方式,同步、异步、单向方式进行网络传输。
  6. 如果注册了消息发送钩子函数,执行after逻辑。注意,就算消息发送过程中发生RemotingException、MQBrokerException、 InterruptedException时该方法也会执行。

异步发送
消息异步发送是指消息生产者调用发送的API后,无须阻塞等待消息服务器返回本次消息发送结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果回调。异步方式相比同步方式,消息发送端的发送性能会显著提高,但为了保护消息服务器的负载压力,RocketMQ对消息发送的异步消息进行了并发控制,通过参数clientAsyncSemaphoreValue来控制,默认为65535。异步消息发送虽然也可以通过DefaultMQProducer#retryTimesWhenSendAsyncFailed属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。

单向发送
单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送结果,并且无须提供回调函数,表示消息发送压根就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做而已,并且没有重试机制。

批量发送

批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过DefaultMQProducer#maxMessageSize。批量消息发送要解决的是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。

RocketMQ对批量消息使用固定格式进行存储,如下图所示。
batch-message

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1]《RocketMQ技术内幕》
[2]《RocketMQ实战与原理解析》
[3] 老生常谈——利用消息队列处理分布式事务
[4] RocketMQ架构解析
[5] MappedByteBuffer VS FileChannel 孰强孰弱?
[6] 文件 IO 操作的一些最佳实践
[7] 海量数据处理之Bloom Filter详解
[8] rocketmq GitHub Wiki

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
1866 1
rocketmq客户端发送消息报错和超时问题
|
6月前
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何发送消息
3分钟白话RocketMQ系列—— 如何发送消息
124 0
|
7月前
|
消息中间件 SQL 弹性计算
RocketMQ中使用Java客户端发送消息和消费的应用
本教程将总结使用java客户端消息发送和消费各种场景, 并Demo演示
505 1
|
11月前
|
消息中间件 前端开发 Java
Springcloud之Rocketmq发送消息
Springcloud之Rocketmq发送消息
|
11月前
|
消息中间件 Java 网络安全
rocketmq发送消息报错
rocketmq发送消息报错
|
存储 消息中间件 RocketMQ
RocketMQ给broker发送消息确定Commitlog的写入的位置
问题 有一个疑问,当client给broker发送消息的时候,怎么知道在commitlog的第几个字节开始写呢?
153 0
RocketMQ给broker发送消息确定Commitlog的写入的位置
|
消息中间件 RocketMQ
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
209 0
不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息
|
消息中间件 Java RocketMQ
rocketmq发送消息底层分析
企业中一般都会封装rocketmq 同步 异步 单向方法,你只需要配置好nameserver地址 topic tag 消息体等,然后调用封装方法进行发送即可。 流程差不多如下 1、导入mq依赖
175 0
rocketmq发送消息底层分析
|
消息中间件 存储 前端开发
同步异步调用,并谈谈消息队列mq;RocketMQ发送消息和消费消息测试类
同步调用优点: 时效性强,打电话、直播,很快可以得到结果 同步调用的问题:
483 1
|
SQL 消息中间件 Java
SpringBoot整合RocketMQ发送消息过滤
消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。 对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤
264 0