3分钟白话RocketMQ系列—— 如何发送消息

简介: 3分钟白话RocketMQ系列—— 如何发送消息

白话3分钟,快速了解RocketMQ如何发送消息。

看完如果不了解,欢迎来打我。

我们知道RocketMQ主要分为消息 生产、存储(消息堆积)、消费 三大块领域。

那接下来,我们白话一下,RocketMQ是如何发送消息的,揭秘消息生产全过程。

注意,如果白话中不小心提到相关代码配置与类名,请参考RocketMQ 4.9.4版本


关键字摘要


  • 哪些消息类型?
  • 发给谁?
  • 怎么发?
  • 怎么知道发成功了还是失败了?
  • 发失败了怎么办?


Q1: RocketMQ有哪些消息类型?


RocketMQ生产消息时,支持多种「消息类型」:

  • 普通消息:发送普通消息。
SendResult send(final Message msg);
  • 普通有序消息:发送普通有序消息,通过指定「消息筛选器selector」,动态决定发送哪个队列。
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg);
  • 严格有序消息:发送严格有序消息,通过指定队列,保证严格有序。
SendResult send(final Message msg, final MessageQueue mq);
  • 事务消息:实现分布式事务。(属于分布式事务范畴,区别较大,本文不再展开讨论,后面单独写一篇针对「事务消息」的分析)

上面列举的三种send方法,都是以同步发送模式为例。

定时/延迟消息从发送方式角度来说,不算一种独立的消息类型。


Q2:RocketMQ怎么知道一条消息要发送给谁?


一般我们要发送一条消息给RocketMQ,需要创建这样一个消息体。

Message msg = new Message( "TestTopic", "Hello World".getBytes() );

在这个消息体里面,我们只单纯指定了要发送的Topic名字,以及要发送的消息内容。

那么,RocketMQ-client怎么知道这条消息要发送到RocketMQ集群中的哪一个broker上呢?

这里需要了解下RocketMQ中Topic的「路由注册与发现机制」。

640.png

RocketMQ基本架构

Topic 路由注册与发现:

  • Broker 每30秒向 NameServer 发送心跳包,里面包含Topic的路由信息,包括主题的读写队列数和操作权限等。NameServer会保存这些路由信息,并记录最后一次收到 Broker 心跳包的时间(NameServer每10秒根据记录的时间戳清理已经失联120秒以上的 Broker)。
  • 生产者每30秒获取一次主题的路由信息。这意味着消息生产者不会立即知道有新的 Broker 加入或者被移除。

640.png

Topic路由信息

Topic的路由信息,包括了Topic的 队列queue和broker的映射关系 ,那么如何利用这个Topic的路由信息呢?

我们需要根据前面的不同「消息类型」进行分别讨论:

  • 普通消息:默认采用轮询机制,消息会依次发送到Topic的每个可用的 Broker 的某个队列queue上,以实现负载均衡。
  • 普通有序消息:根据传递的 MessageQueueSelector 和消息体 msg 内容,计算可以投递的队列queue,然后发送消息。(可以类比分库分表中的分表计算写入的方式)
  • 严格有序消息:根据传递的 MessageQueue 信息,强制消息发送到对应队列queue上。(可以类比分库分表中,强制指定物理表写入的方式)

根据消息类型获取到目标队列queue后,就可以根据Topic路由信息发送消息到指定broker上了。


Q3:怎么发送一条消息?


从发送模式角度来说,RocketMQ有三种「消息发送模式」:

  • 同步发送:调用发送消息方法后,同步阻塞,直到返回SendResult。
SendResult send(final Message msg);
  • 异步发送:调用发送消息方法后,立即返回,发送结果会通过开发者自己注册的回调函数SendCallback进行处理。
void send(final Message msg, final SendCallback sendCallback);
  • 单向发送:这种方法完全不关心发送后的返回结果。显然,它具有最大吞吐量,但也存在消息丢失的潜在风险。
void sendOneway(final Message msg);

上面列举的三种send方法,都是以「普通消息」为例。

「消息类型」 和 「消息发送模式」 是 N*M 的关系,所以聪明的你一定已经想到了,存在9种不同组合(不包括事物消息),RocketMQ也是在接口中定义了9种不同方法。


Q4: 发送后,怎么知道消息发成功了还是失败了?


前面介绍了三种「消息发送模式」,其中「单向发送」属于不可靠发送,我们无法知道是否发送成功。

而「同步发送」和「异步发送」都是可靠发送,我们能够获取发送状态,知道是否成功。

在「同步发送」中,我们可以根据SendResult中的sendStatus属性判断是否发送成功。

640.png

SendResult类属性

在「异步发送」中,我们可以自定义实现SendCallbackonSuccess()方法和onException()方法,来判断消息是否发送成功。

640.png

SendCallback接口定义


Q5: 消息发送失败了怎么办?


如果消息发送失败了,RocketMQ-client默认有重试机制,以确保消息的高可用性。

前面提到,生产者每30秒获取一次主题的路由信息,所以即使某个 Broker 宕机,消息发送者可能无法立即察觉到它的宕机状态。

但是,当消息发送者向某个 Broker 发送消息后,如果返回异常,生产者会在接下来的一段时间内(例如5分钟)避免再次选择该 Broker 上的队列来发送消息。这样做的目的是规避可能发生故障的 Broker。

当然了,用户也能根据返回的异常,自己定义业务重试、补偿机制。

需要注意的是,不同「消息类型」和「消息发送模式」的RocketMQ-client默认重试机制不同。

消息类型:

  • 普通消息:无顺序性要求,异常时RocketMQ-client默认重试
  • 普通有序消息:异常时RocketMQ-client默认不重试,可以用户自己捕获异常重试,并发送到其他队列。
  • 严格有序消息:保证严格有序,异常时RocketMQ-client默认不重试,可以用户自己捕获异常重试。

注意:有序消息异常时RocketMQ-client都是默认不重试

消息发送模式:

  • 同步发送:配置retryTimesWhenSendFailed默认重试次数。
  • 异步发送:配置retryTimesWhenSendAsyncFailed默认重试次数。
  • 单向发送:无重试机制,存在丢失消息的风险。

注意:单向发送模式异常时RocketMQ-client默认不重试


总结


  • 有哪些消息类型:普通消息、有序消息、事务消息
  • 发给谁?:Topic路由信息注册与发现机制、普通消息轮询发送、有序消息指定selector或者queue发送
  • 怎么发?:同步发送、异步发送、单向发送
  • 怎么知道发成功了还是失败了?:同步&异步都能够获取发送状态(可靠发送)、单向发送不可靠
  • 发失败了怎么办?: 失败重试机制

3分钟到了吗?应该对RocketMQ如何生产消息有全面了解了吧。

如果还想了解更多,欢迎关注下一期内容。


往期热门笔记合集推荐:

  • HBase原理与实战笔记合集
  • MySQL实战笔记合集
  • Canal/Otter源码与实战笔记合集
  • Java实战技巧笔记合集
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
消息中间件 缓存 物联网
MQTT常见问题之MQTT发送消息到阿里云服务器被拒如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3分钟白话RocketMQ系列—— 如何保证消息不丢失
3811 1
|
消息中间件 存储 缓存
3分钟白话RocketMQ系列—— 如何消费消息
3分钟白话RocketMQ系列—— 如何消费消息
734 0
|
消息中间件 Apache RocketMQ
rocketmq客户端发送消息报错和超时问题
org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <10.0.21.69:10911> timeout, 1000(ms)、 closeChannel: close the connection to remote address
3403 1
rocketmq客户端发送消息报错和超时问题
|
5月前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
存储 缓存 物联网
MQTT常见问题之MQTT发送消息过多内存不够处理不过来如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
消息中间件 存储 canal
3分钟白话RocketMQ系列—— 如何保证消息顺序性
3分钟白话RocketMQ系列—— 如何保证消息顺序性
1667 1
3分钟白话RocketMQ系列—— 如何保证消息顺序性
|
消息中间件 存储 Kafka
3分钟白话RocketMQ系列—— 核心概念
3分钟白话RocketMQ系列—— 核心概念
285 1
|
消息中间件 JSON 数据格式
使用RabbitMQ控制台查看和发送消息
使用RabbitMQ控制台查看和发送消息
1536 0
|
消息中间件 SQL 弹性计算
RocketMQ中使用Java客户端发送消息和消费的应用
本教程将总结使用java客户端消息发送和消费各种场景, 并Demo演示