RocketMQ 同步发送、异步发送和单向发送,如何选择?

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 本文详细分析了 RocketMQ 中同步发送、异步发送和单向发送三种消息发送方式的原理、优缺点及适用场景。同步发送可靠性高但延迟较大,适合订单系统等场景;异步发送非阻塞且延迟低,适用于实时数据处理等场景;单向发送高效但可靠性低,适用于日志收集等场景。文章还提供了示例代码和核心源码分析,帮助读者更好地理解每种发送方式的特点。

你好,我是猿java。

在 RocketMQ 中,有 3种简单的消息发送方式:同步发送、异步发送和单向发送。这篇文章,我们将详细分析这三种发送方式的原理、优缺点、使用场景以及使用该方式是否会丢失数据。

本文源码基于: Apache RocketMQ release-5.2.0

同步发送

原理分析

在同步发送模式下,RocketMQ 默认采用同步刷盘方式,当生产者将消息发送到 Broker 后,会等待 Broker 的响应(默认超时 5分钟),Broker 接收消息后,会将其写入内存缓存,并进行刷盘操作。因此,如果 Broker 响应成功,代表消息一定成功写入磁盘。

rocketmq-sync-send2.png
g

同步发送主要涉及以下几个步骤:

  1. 创建Producer:创建一个Producer对象;
  2. 创建消息:创建一个Message对象,设置Topic、Tag标签和消息体;
  3. 发送消息:调用DefaultMQProducersend方法;
  4. 等待响应:发送方会阻塞等待服务器的响应,直到收到确认消息;

rocketmq-sync-send.png

如下示例代码为一个完整的同步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class SyncProducerTest {
   
  public static void main(String[] args) throws Exception {
   
    // 1、创建 producer,设置组名为 SyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("SyncGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定 Topic,Tag和消息体
    Message msg = new Message("SyncTopic", "sync", "SyncMessage".getBytes("UTF-8"));
    // 5、发送同步消息
    SendResult sendResult = producer.send(msg);
    // 6、通过 sendResult 判断消息是否成功送达
    System.out.printf("message send result:" + sendResult);
    // 7、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的同步发送主要涉及以下几个关键源码类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。
  • MQClientAPIImpl#sendMessage:底层消息发送实现。
  • NettyRemotingClient#invokeSync:通过 Netty 实现网络通信。
  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg)

优缺点

优点

  • 简单易用。
  • 可靠性高,发送方可以确认消息是否成功发送,一旦发送成功,消息就已经写入磁盘,消息不会丢失。

缺点

  • 延迟较高,需要等待服务器的响应。
  • 吞吐量可能受限于网络延迟和服务器性能。

使用场景

适用于对消息可靠性要求较高的场景,如订单系统、金融交易、重要的消息通知等。

异步发送

原理分析

在异步发送模式下,RocketMQ 默认采用异步刷盘方式,当生产者发送消息到 Broker 后,消息写入内存缓存成功后,Broker 立即返回响应(默认超时 5分钟),后台线程再异步将消息批量写入磁盘。因此,这种方式提高了系统的吞吐量和性能,但在系统崩溃时可能会丢失部分未刷盘的消息。

img

异步发送主要涉及以下几个步骤:

  1. 创建Producer:创建一个Producer对象;
  2. 创建消息:同样创建一个Message对象。
  3. 发送消息:调用DefaultMQProducersend方法,但传递一个SendCallback回调对象。
  4. 处理响应:回调函数会在消息发送成功或失败时被调用。

rocketmq-async-send.png

如下示例代码为一个完整的异步发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class AsyncProducerTest {
   
  public static void main(String[] args) throws Exception {
   
    // 1、创建 producer,设置组名为 AsyncGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("AsyncGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("AsyncTopic","async", "AsyncMessage".getBytes("UTF-8"));
    // 5、发送异步消息,SendCallback是处理异步回调的方法
    producer.send(msg, new SendCallback() {
   
      @Override
      public void onSuccess(SendResult sendResult) {
     // 成功回调
        System.out.println("message send success: " + sendResult);
      }
      @Override
      public void onException(Throwable throwable) {
     // 失败回调
        System.out.println("message send fail: " + throwable);
      }
    });
    // 6、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的异步发送主要涉及以下几个关键源码类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。
  • MQClientAPIImpl#sendMessage:底层消息发送实现。
  • NettyRemotingClient#invokeAsync:通过 Netty 实现网络通信。
  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(Message msg, SendCallback sendCallback)

优缺点

优点

  • 非阻塞,发送方可以继续执行其他任务,提高吞吐量。
  • 延迟较低,适用于对响应时间敏感的场景。

缺点

  • 实现复杂度较高,需要处理异步回调。
  • 可靠性相对降低,需要处理失败重试等问题。
  • 无法保证发送出去的数据不丢失。

使用场景

适用于对响应时间要求较高的场景,如实时数据处理、日志采集、消费信息的推送等。

单向发送

原理分析

单向(OneWay)发送是一种只负责发送消息而不等待任何响应的方式。生产者将消息发送到 Broker 后(默认超时 5分钟),不关心消息是否成功到达或被持久化,主要依赖 Broker 进行刷盘操作,单向发送通常与异步刷盘结合使用,以提高发送效率。

rocketmq-async-send2.png

单向发送主要涉及以下几个步骤:

  1. 创建Producer:创建一个Producer对象;
  2. 创建消息:创建一个Message对象。
  3. 发送消息:调用DefaultMQProducersendOneway方法。

rocketmq-oneway-send.png

如下示例代码为一个完整的单向发送流程:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class OneWayProducerTest {
   
  public static void main(String[] args) throws Exception {
   
    // 1、创建 producer,设置组名为 OneWayGroupTest
    DefaultMQProducer producer = new DefaultMQProducer("OneWayGroup");
    // 2、指定 NameServer的地址,以获取 Broker路由地址
    producer.setNamesrvAddr("x.x.x.x:9876");
    // 3、启动 producer
    producer.start();
    // 4、创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("OneWayTopic","oneway", "OneWayMessage".getBytes("UTF-8"));
    // 5、发送单向消息
    producer.sendOneway(msg);
    // 6、关闭 Producer
    producer.shutdown();
  }
}

RocketMQ 的单向发送主要涉及以下几个关键类和方法:

  • DefaultMQProducer:生产者类,负责发送消息。
  • MQClientAPIImpl#sendMessage:底层消息发送实现。
  • NettyRemotingClient#invokeOneway:通过 Netty 实现网络通信。
  • Broker 端的 SendMessageProcessor:处理发送请求。

源码参考:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendOneway(Message msg)

优缺点

优点

  • 非常高效,延迟最低。
  • 适用于对可靠性要求不高的场景。

缺点

  • 无法确认消息是否成功发送。
  • 可靠性最低,消息可能丢失。

使用场景

适用于对可靠性要求不高的场景,如日志收集、监控数据上报等。

3种方式对比

发送方式 优点 缺点 使用场景
同步发送 可靠性高,简单易用 延迟较高,吞吐量受限 订单系统、金融交易、重要的消息通知等
异步发送 非阻塞,延迟较低 实现复杂度高,可靠性相对降低 实时数据处理、日志采集、消费信息的推送等
单向发送 高效,延迟最低 无法确认消息是否成功发送,可靠性最低 日志收集、监控数据上报等

如何选择?

同步发送

消息发送后会等待服务器的响应,整个过程业务是阻塞等待的,适用于对可靠性要求高的场景,比如 订单系统、金融交易等。

异步发送

消息发送后,不等待服务器响应,而是通过回调函数处理响应,适用于对响应时间要求高的场景,比如实时数据处理、日志采集、消费信息的推送等

单向发送

单向发送只负责发送消息而不等待任何响应的方式,也不需要对发送的状态、结果负责,适用于对可靠性要求不高的场景,比如日志收集、监控数据上报等。

每种发送方式都有其适用的场景和优缺点,具体如何选择,一定需要根据业务需求进行权衡。

总结

本文分析了 RocketMQ 同步发送、异步发送和单向发送三种方式的原理、优缺点以及使用场景,并且分析了每种方式涉及到的核心源码。

通过上文的介绍可以知道同步发送方式可以保证消息发送时不丢,但是性能相对其他两种方式差一些。

RocketMQ 是一款优秀的开源消息中间件,作为 Java程序员,建议多去阅读它的源码,吸收其中比较好的代码思维。

参考资料

rocketmq官网

学习交流

如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注:猿java,持续输出硬核文章。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 存储 缓存
服务异步通信--RabbitMQ
服务异步通信--RabbitMQ
94 0
|
7月前
|
消息中间件 测试技术 RocketMQ
消息队列 MQ产品使用合集之在异步发送消息函数sendMessage()中出现了错误,错误代码为-3,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
4月前
|
消息中间件 Kafka 数据安全/隐私保护
RabbitMQ异步通信详解
RabbitMQ异步通信详解
115 16
|
6月前
|
消息中间件 传感器 负载均衡
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ使用问题之如何配置一主一从的同步复制模式
|
6月前
|
消息中间件 运维 RocketMQ
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
MetaQ/RocketMQ 原理问题之slave broker是从master同步信息的问题如何解决
|
7月前
|
消息中间件 存储 Java
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
后端开发Spring框架之消息介绍 同步异步 JMS AMQP MQTT Kafka介绍
48 0
|
8月前
|
消息中间件 存储 Java
RabbitMQ-同步和异步区别&快速入门
RabbitMQ-同步和异步区别&快速入门
218 1
|
8月前
|
消息中间件 存储 JSON
服务器的异步通信——RabbitMQ2
服务器的异步通信——RabbitMQ
62 0
|
8月前
|
消息中间件 缓存 中间件
服务器的异步通信——RabbitMQ1
服务器的异步通信——RabbitMQ
64 0