RocketMQ 同步发送、异步发送和单向发送,如何选择?

简介: 本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。

你好,我是猿java。

在 RocketMQ 中,有 3种简单的消息发送方式:同步发送、异步发送和单向发送。这篇文章,我们将详细分析这三种发送方式的原理、优缺点、使用场景以及使用该方式是否会丢失数据。

本文源码基于: Apache RocketMQ release-5.2.0

同步发送

原理分析

在同步发送模式下,RocketMQ 默认采用同步刷盘方式,当生产者将消息发送到 Broker 后,会等待 Broker 的响应(默认超时 5分钟),Broker 接收消息后,会将其写入内存缓存,并进行刷盘操作。因此,如果 Broker 响应成功,代表消息一定成功写入磁盘。

rocketmq-sync-send2.png
g

同步发送主要涉及以下几个步骤:

  1. 创建Producer:创建一个Producer对象;
  2. 创建消息:创建一个Message对象,设置Topic、Tag标签和消息体;
  3. 发送消息:调用DefaultMQProducersend方法;
  4. 等待响应:发送方会阻塞等待服务器的响应,直到收到确认消息;

rocketmq-sync-send.png

如下示例代码为一个完整的同步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class SyncProducerTest {
   
  public static void main(String[] args) throws Exception {
   
    // 1、创建 producer,设置组名为 SyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("SyncGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定 Topic,Tag和消息体
    Message msg = new Message("SyncTopic", "sync", "SyncMessage".getBytes("UTF-8"));
    // 5、发送同步消息
    SendResult sendResult = producer.send(msg);
    // 6、通过 sendResult 判断消息是否成功送达
    System.out.printf("message send result:" + sendResult);
    // 7、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的同步发送主要涉及以下几个关键源码类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。
  • MQClientAPIImpl#sendMessage:底层消息发送实现。
  • NettyRemotingClient#invokeSync:通过 Netty 实现网络通信。
  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg)

优缺点

优点

  • 简单易用。
  • 可靠性高,发送方可以确认消息是否成功发送,一旦发送成功,消息就已经写入磁盘,消息不会丢失。

缺点

  • 延迟较高,需要等待服务器的响应。
  • 吞吐量可能受限于网络延迟和服务器性能。

使用场景

适用于对消息可靠性要求较高的场景,如订单系统、金融交易、重要的消息通知等。

异步发送

原理分析

在异步发送模式下,RocketMQ 默认采用异步刷盘方式,当生产者发送消息到 Broker 后,消息写入内存缓存成功后,Broker 立即返回响应(默认超时 5分钟),后台线程再异步将消息批量写入磁盘。因此,这种方式提高了系统的吞吐量和性能,但在系统崩溃时可能会丢失部分未刷盘的消息。

img

异步发送主要涉及以下几个步骤:

  1. 创建Producer:创建一个Producer对象;
  2. 创建消息:同样创建一个Message对象。
  3. 发送消息:调用DefaultMQProducersend方法,但传递一个SendCallback回调对象。
  4. 处理响应:回调函数会在消息发送成功或失败时被调用。

rocketmq-async-send.png

如下示例代码为一个完整的异步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducerTest {
   
  public static void main(String[] args) throws Exception {
   
    // 1、创建 producer,设置组名为 AsyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("AsyncGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("AsyncTopic","async", "AsyncMessage".getBytes("UTF-8"));
    // 5、发送异步消息,SendCallback是处理异步回调的方法
    producer.send(msg, new SendCallback() {
   
      @Override
      public void onSuccess(SendResult sendResult) {
     // 成功回调
        System.out.println("message send success: " + sendResult);
      }
      @Override
      public void onException(Throwable throwable) {
     // 失败回调
        System.out.println("message send fail: " + throwable);
      }
    });
    // 6、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的异步发送主要涉及以下几个关键源码类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。
  • MQClientAPIImpl#sendMessage:底层消息发送实现。
  • NettyRemotingClient#invokeAsync:通过 Netty 实现网络通信。
  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg, SendCallback sendCallback)

优缺点

优点

  • 非阻塞,发送方可以继续执行其他任务,提高吞吐量。
  • 延迟较低,适用于对响应时间敏感的场景。

缺点

  • 实现复杂度较高,需要处理异步回调。
  • 可靠性相对降低,需要处理失败重试等问题。
  • 无法保证发送出去的数据不丢失。

使用场景

适用于对响应时间要求较高的场景,如实时数据处理、日志采集、消费信息的推送等。

单向发送

原理分析

单向(OneWay)发送是一种只负责发送消息而不等待任何响应的方式。生产者将消息发送到 Broker 后(默认超时 5分钟),不关心消息是否成功到达或被持久化,主要依赖 Broker 进行刷盘操作,单向发送通常与异步刷盘结合使用,以提高发送效率。

rocketmq-async-send2.png

单向发送主要涉及以下几个步骤:

  1. 创建Producer:创建一个Producer对象;
  2. 创建消息:创建一个Message对象。
  3. 发送消息:调用DefaultMQProducersendOneway方法。

rocketmq-oneway-send.png

如下示例代码为一个完整的单向发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class OneWayProducerTest {
   
  public static void main(String[] args) throws Exception {
   
    // 1、创建 producer,设置组名为 OneWayGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("OneWayGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("OneWayTopic","oneway", "OneWayMessage".getBytes("UTF-8"));
    // 5、发送单向消息
    producer.sendOneway(msg);
    // 6、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的单向发送主要涉及以下几个关键类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。
  • MQClientAPIImpl#sendMessage:底层消息发送实现。
  • NettyRemotingClient#invokeOneway:通过 Netty 实现网络通信。
  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendOneway(Message msg)

优缺点

优点

  • 非常高效,延迟最低。
  • 适用于对可靠性要求不高的场景。

缺点

  • 无法确认消息是否成功发送。
  • 可靠性最低,消息可能丢失。

使用场景

适用于对可靠性要求不高的场景,如日志收集、监控数据上报等。

3种方式对比

发送方式 优点 缺点 使用场景
同步发送 可靠性高,简单易用 延迟较高,吞吐量受限 订单系统、金融交易、重要的消息通知等
异步发送 非阻塞,延迟较低 实现复杂度高,可靠性相对降低 实时数据处理、日志采集、消费信息的推送等
单向发送 高效,延迟最低 无法确认消息是否成功发送,可靠性最低 日志收集、监控数据上报等

如何选择?

同步发送

消息发送后会等待服务器的响应,整个过程业务是阻塞等待的,适用于对可靠性要求高的场景,比如 订单系统、金融交易等。

异步发送

消息发送后,不等待服务器响应,而是通过回调函数处理响应,适用于对响应时间要求高的场景,比如实时数据处理、日志采集、消费信息的推送等

单向发送

单向发送只负责发送消息而不等待任何响应的方式,也不需要对发送的状态、结果负责,适用于对可靠性要求不高的场景,比如日志收集、监控数据上报等。

每种发送方式都有其适用的场景和优缺点,具体如何选择,一定需要根据业务需求进行权衡。

总结

本文分析了 RocketMQ 同步发送、异步发送和单向发送三种方式的原理、优缺点以及使用场景,并且分析了每种方式涉及到的核心源码。

通过上文的介绍可以知道同步发送方式可以保证消息发送时不丢,但是性能相对其他两种方式差一些。

RocketMQ 是一款优秀的开源消息中间件,作为 Java程序员,建议多去阅读它的源码,吸收其中比较好的代码思维。

参考资料

rocketmq官网

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注:猿java,持续输出硬核文章。

目录
相关文章
|
23天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
16天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
20天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2576 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
18天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
3天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
2天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
162 2
|
20天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1576 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
|
22天前
|
编解码 JSON 自然语言处理
通义千问重磅开源Qwen2.5,性能超越Llama
击败Meta,阿里Qwen2.5再登全球开源大模型王座
971 14
|
3天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
214 2
|
17天前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
733 10