一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性

简介: 本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。

1.概述

之前我详解总结过Kafka相关知识点、也分享过RocketMQ是什么、以及两个消息队列组件的区别和各自优缺点,不清楚的可以看看之前我们发布的文章自行了解下,所以今天这里不再做消息队列使用场景之类的赘述了,本文将从入门到实践,全方位带你掌握RocketMQ的核心概念、部署方式、API使用和高级特性。

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

自诞生以来,Apache RocketMQ 凭借其简单的架构、丰富的业务功能和极高的可扩展性,已被企业开发人员和云供应商广泛采用。经过十多年的广泛场景打磨,RocketMQ 已成为金融级可靠业务消息的行业标准,广泛应用于互联网、大数据、移动互联网、物联网等领域。

2.核心概念

众所周知,RocketMQ是参考借鉴Kafka研发出来的,所以很多核心思想和概念几乎是一致的,所以我这里不会一一讲述相关知识概念,主要重点讲述RocketMQ独有的概念思想,在叙述之前先来看看RocketMQ的架构图,RocketMQ相关的组件都可以集群部署,比如说NameServerbroker生产者消费者等,broker集群还可以部署成一主多从,多主多从等模式,下面是一主多从的示例:

  • NameServer:可以认为它是一个轻量级注册中心,类似于zookeeper在kafka中的应用,但是zookeeper相对比较重,所以kafka都在去zookeeper化,NameServer比较轻量,主要用于对RocketMQ集群信息的管理,包brokertopicmessage queue路由信息`等等,可集群部署。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
  • Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息

  • 生产者:与Name Server集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳。

  • 消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。
  • 消费者组:消费者组是包含使用相同消费行为的消费者的负载均衡组。与作为运行实体的消费者不同,消费者组是逻辑资源。 RocketMQ 初始化消费者组中的多个消费者,以实现消费性能的扩展和高可用性灾难恢复。这和kafka中消费者组概念一样,主题topic被消费者组订阅,但实际上是消费者组中的消费者真正地消息消费。
  • 主题:主题在逻辑上是队列的集合;我们可以发布消息到主题或从主题接收消息。主题是消息的第一级分类
  • Tag:消息标签,消息的第二级分类。可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。消息标签是细粒度的消息分类属性,允许消息在主题级别以下进行细分。消费者通过订阅特定标签来实现消息过滤。这是RocketMQ特有的功能
  • 队列(message queue): 队列是RocketMQ 中用于存储和传输消息的容器,是消息存储的最小单位。RocketMQ 中的主题包含多个队列。这样队列支持水平分区和流式存储。 RocketMQ 的队列模型类似于 Kafka 的分区模型,虽然都是存储消息,但是实现逻辑、使用玩法都是不尽相同的,这在之前分享的文章中有详细总结,可自行查看
  • 消息:消息是RocketMQ 中数据传输的最小单位。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到RocketMQ 代理。然后,代理根据相关语义将消息传递给消费者。RocketMQ有以下几种消息类型:
    • 普通:普通消息不需要特殊语义,也不与其他普通消息相关联。
    • FIFO:RocketMQ 使用消息组来确定一组指定消息的顺序。消息按发送顺序传递。
    • 延迟:您可以指定延迟,使消息仅在延迟时间过去后才对消费者可用,而不是在生产时立即传递消息。
    • 事务:RocketMQ 支持分布式事务消息,并确保数据库更新和消息调用的事务一致性

后面三个概念都是RocketMQ中区别于kafka的不同逻辑知识点,需要重点关注下。

3.环境搭建

为了简单直接,快速上手,我们这里使用单机模式部署,首先要求服务器装有jdk1.8+环境,因为RocketMQ是Java开发的。

这里我按照官网教程安装当前最新版本RocketMQ5.2.0版本,先下载安装包rocketmq-all-5.2.0-bin-release.zip上传服务器,

解压安装包:

unzip rocketmq-all-5.2.0-bin-release.zip

查看安装包:

benchmark  bin  conf  lib  LICENSE  nohup.out  NOTICE  README.md

启动NameServer

nohup sh bin/mqnamesrv &

验证是否启动成功:

tail -f ~/logs/rocketmqlogs/namesrv.log

日志输出打印:

2025-06-05 15:56:10 INFO main - The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

启动 Broker 和 Proxy

ohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

验证是否启动成功:

tail -f ~/logs/rocketmqlogs/proxy.log

日志输出:

2025-06-05 16:06:42 INFO main - The broker[broker-a, 192.168.231.137:10911] boot success. serializeType=JSON and name server is localhost:9876

按照上面的流程,RocketMQ已经安装好,我们就可以开始发送、消费消息了,但是为了清楚直观,我们再安装一个RocketMQ 可视化控制台,教程:https://rocketmq.apache.ac.cn/docs/deploymentOperations/04Dashboard/,比较简单,这里碍于篇幅问题就不赘述了。同时关于集群模式部署教程,也请参考官网教程:https://rocketmq.apache.ac.cn/docs/deploymentOperations/01deploy

安装好之后访问路径:http://10.10.0.10:8021,这里我改成端口号为8021,防止默认的端口号8080冲突

4.集成客户端发送、消费消息

Java项目中添加依赖:当前最新版本5.0.7

  <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client-java</artifactId>
      <version>5.0.7</version>
  </dependency>

4.1 普通消息

普通消息是指在RocketMQ 中没有特殊功能的消息

在控制台先创建普调消息类型的主题topic_normal_test:

编写生产者发送消息代码:

public class ProducerNormalExample {
   

    private static final Logger logger = LoggerFactory.getLogger(ProducerNormalExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_normal_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 消息体
                .setBody("哈哈😄,这是shepherd发的第1条同步消息".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send message", e);
        }
        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

异步发送只需要调用异步发送方法即可:

    // 异步发送
    CompletableFuture<SendReceipt> future = producer.sendAsync(message);
    ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
    future.whenCompleteAsync((sendReceipt, throwable) -> {
   
        if (null != throwable) {
   
            logger.error("Failed to send message", throwable);
            return;
        }
        logger.info("Async send message successfully, messageId={}", sendReceipt.getMessageId());
    }, sendCallbackExecutor);

发送成功之后就能在控制台查看到消息了:

消费消息:

public class PushConsumerNormalExample {
   

    private static final Logger logger = LoggerFactory.getLogger(PushConsumerNormalExample.class);

    public static void main(String[] args) throws IOException, InterruptedException, ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_normal_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者组
                .setConsumerGroup(consumerGroup)
                // 设置过滤
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 消息监听器,消费消息逻辑所在
                .setMessageListener(messageView -> {
   
                    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // 睡眠,让消费者一直监听、消费消息
        Thread.sleep(Long.MAX_VALUE);
        // 消费者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//         pushConsumer.close();
    }

}

4.2 延迟消息

延迟消息是RocketMQ 中具有高级功能的消息,kafka中没有延迟消息,需要自己写代码实现,简单实现就是创建一个队列topic_delay_30m代表这个队列的消息是30m之后再执行,发送消息的同时写入发送时间,然后通过高频的定时扫描轮训比较消息的发送时间和当前时间是否差值超过了30m,超过就消费消息,听上去是不是挺麻烦,所以RocketMQ就原生提供延迟消息这一功能。

像上面一样先去控制台创建一个延迟消息类型的主题topic_delay_test,这里就不截图展示了

生产者发送延迟消息:

public class ProducerDelayExample {
   

    private static final Logger logger = LoggerFactory.getLogger(ProducerDelayExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_delay_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 设置延迟执行时间
                .setDeliveryTimestamp(System.currentTimeMillis() + 3 * 60 * 1000)
                // 消息体
                .setBody("我要延迟3分钟之后再执行".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send delay message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send delay message", e);
        }

        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

这里我们指定3分钟之后执行,注意这里并不像上面的普通消息一样,生产者发送之后在控制台立即可见,需要等待3分钟之后才能可见。也就是消息发送到服务器端,消息是存储在基于时间的存储系统中,直到指定的传递时间,消息不会立即创建索引。等到了指定的时间,消息才被写入常规存储引擎,消息对消费者可见,并等待消费者消费。

消费者消费消息:

public class PushConsumerDelayExample {
   
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerDelayExample.class);

    public static void main(String[] args) throws IOException, InterruptedException, ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_delay_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者组
                .setConsumerGroup(consumerGroup)
                // 设置过滤
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 消息监听器,消费消息逻辑所在
                .setMessageListener(messageView -> {
   
                    logger.info("Consume delay message successfully, messageView={}", messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // 睡眠,让消费者一直监听、消费消息
        Thread.sleep(Long.MAX_VALUE);
        // 消费者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//         pushConsumer.close();
    }
}

消费消息代码逻辑都一样,所以下面碍于篇幅问题,就不再展示消费代码了

4.3 顺序消息

顺序消息是RocketMQ 中的一种高级消息类型。顺序消息按消息发送的顺序传递给消费者。这种消息类型允许您在业务场景中实现有序处理。RocketMQ 使用消息组来确定顺序消息的顺序。您必须为顺序消息配置消息组。消息组中的消息按先进先出 (FIFO) 顺序处理。消息排序不适用于不同的消息组或不在消息组中的消息。属于同一消息组的消息将按发送顺序存储在同一队列中。比如说可以使用用户 ID 作为消息组关键字来实现同一用户的消息有序处理

RocketMQ 确保消费顺序与队列中的存储顺序相同,也就是只保证在队列级别顺序消费消息,这和kafka只保证在同一个分区内顺序消费思想是一样的,只是kafka通过路由key将消息存储到同一分区,而RocketMQ使用消息组来实现消息存储在同一队列中,一个套路~~~

同样地先去控制台创建一个fifo消息类型的主题topic_fifo_test

生产者生产消息:

public class ProducerFifoExample {
   
    private static final Logger logger = LoggerFactory.getLogger(ProducerFifoExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_fifo_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 消息组 同一消息组的消息存储在同一队列中,队列内消息保证顺序消费 这里模拟用户id分组来保证同一用户消息顺序消费
                .setMessageGroup("userId::1")
                // 消息体
                .setBody("顺序消息001".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send message", e);
        }

        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

消费者代码如上面小结所示。

4.4 事务消息

事务消息是RocketMQ 提供的一种高级消息类型,用于确保消息生产与本地事务之间的最终一致性。RocketMQ 的事务消息功能将两阶段提交与本地事务相结合,实现提交结果的全局一致性。

控制台创建一个事务消息类型的主题topic_transaction_test

生产者提交事务消息:

public class ProducerTransactionExample {
   
    private static final Logger logger = LoggerFactory.getLogger(ProducerTransactionExample.class);

    // 模拟校验订单交易是否已提交
    private static boolean checkOrderById(String orderId) {
   
        return true;
    }
    // 模拟本地事务的执行结果
    private static boolean doLocalTransaction() {
   
        return true;
    }
    public static void main(String[] args) throws ClientException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_transaction_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建一个事务生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                // 指定事务检查器
                .setTransactionChecker(messageView -> {
   
                    // 获取订单id
                    final String orderId = messageView.getProperties().get("OrderId");
                    if (Strings.isNullOrEmpty(orderId)) {
   
                        // 没有获取到,直接回滚,不提交发送消息
                        return TransactionResolution.ROLLBACK;
                    }
                    // 查询订单交易已提交
                    return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                })
                .build();
        // Create a transaction branch.
        final Transaction transaction;
        try {
   
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
   
            logger.error("error", e);
            // If the transaction branch fails to be created, the transaction is terminated.
            return;
        }
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("trx-1")
                // 标签
                .setTag("trx-A")
                // 事务消息唯一id,通过该id关联查询本地事务状态
                .addProperty("OrderId", "001")
                // Message body.
                .setBody("这是一条事务消息".getBytes())
                .build();

        try {
   
            // Send a half message.
            final SendReceipt sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
   
            return;
        }
        /**
         * Execute the local transaction and check the execution result.
         * 1. If the result is Commit, deliver the message.
         * 2. If the result is Rollback, roll back the message.
         * 3. If an unknown exception occurs, no action is performed until a response is obtained from a half message status query.
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
   
            try {
   
                transaction.commit();
            } catch (ClientException e) {
   
                logger.error("error", e);
            }
        } else {
   
            try {
   
                transaction.rollback();
            } catch (ClientException e) {
   
                logger.error("error", e);
            }
        }
    }

}

这里模拟的是用户提交订单支付之后,发送订单消息给下游服务做处理,这是一个典型的分布式事务场景。

5.消费者类型

RocketMQ 支持以下类型的消费者:PushConsumerSimpleConsumerPullConsumer。这三种消费者类型具有不同的集成和控制方法,您可以使用它们来满足不同业务场景下的消息需求。

注意:在同一个消费者组中混合使用 PullConsumer 和其他消费者类型是严格禁止的

项目 PushConsumer SimpleConsumer PullConsumer
API 操作调用 使用消息监听器调用回调操作以返回消费结果。消费者只能在消息监听器的范围内处理消费逻辑。 业务应用程序实现消息处理并调用相应的操作以返回消费结果。 业务应用程序实现消息拉取和处理,并调用相应的操作以返回消费结果。
消费并发管理 Apache RocketMQ SDK 用于管理消息消费的并发线程数。 用于消息消费的并发线程数基于各个业务应用程序的消费逻辑。 用于消息消费的并发线程数基于各个业务应用程序的消费逻辑。
负载均衡机制 5.0 版本中的基于消息的负载均衡,早期版本中的基于队列的负载均衡。 基于消息的负载均衡。 基于队列的负载均衡。
API 灵活性 API 操作被封装,灵活性较差。 原子操作提供了极大的灵活性。 原子操作提供了极大的灵活性。
场景 此消费者类型适用于不需要自定义流程的开发场景。 此消费者类型适用于需要自定义流程的开发场景。 建议仅在流处理框架场景中进行集成

PushConsumer

PushConsumer 是一种高度封装的消费者类型。消息消费和消费结果提交仅通过消息监听器进行处理。消息获取、消费状态提交和消费重试由RocketMQ 客户端 SDK 完成。一开始我差点以为它是一条一条消息的消费,这样消费也太慢了吧?设计这个消费者意义不大呀

// 消息监听器,消费消息逻辑所在
.setMessageListener(messageView -> {
   
    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
    try {
   
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
   
        throw new RuntimeException(e);
    }
    return ConsumeResult.SUCCESS;
})

这里我模拟消费处理一条消息需要5s,看看消费两条消息:

common-demo] [] [2025-06-06 15:11:22.004] [INFO] [RocketmqMessageConsumption-0-18@PID_IS_UNDEFINED] com.shepherd.basedemo.rocketmq.PushConsumerNormalExample lambda$main$0: Consume message successfully, messageId=01FE46DDDDBE846F1408542E5500000000
[common-demo] [] [2025-06-06 15:11:22.005] [INFO] [RocketmqMessageConsumption-0-17@PID_IS_UNDEFINED] com.shepherd.basedemo.rocketmq.PushConsumerNormalExample lambda$main$0: Consume message successfully, messageId=01FE46DDDDBE846E8608542E4000000000

看的出来是单独的线程分别消费了消息。

其实对于 PushConsumer,实时消息处理基于 SDK 的典型 Reactor 线程模型。SDK 具有内置的长轮询线程,该线程拉取消息并将消息存储到队列中。然后,消息从队列传递到各个消息消费线程。消息监听器根据消息消费逻辑进行操作。下图显示了 PushConsumer 消费者的消息消费过程。

对于 RocketMQ 中的 fifo 消息,如果为消费者组配置了有序消息消费,PushConsumer 消费者会按消费顺序消费消息。当 PushConsumer 消费者消费消息时,无需在业务逻辑中定义消费顺序,即可确保消费顺序。

SimpleConsumer

SimpleConsumer 是一种支持消息处理原子操作的消费者类型。这种类型的消费者根据业务逻辑调用操作来获取消息、提交消费状态和执行消息重试。SimpleConsumer 涉及多个 API 操作。根据需要调用相应的操作来获取消息并将消息分发到业务线程进行处理。然后,调用提交操作以提交消息处理结果:

public class SimpleConsumerExample {
   

    private static final Logger logger = LoggerFactory.getLogger(SimpleConsumerExample.class);

    public static void main(String[] args) throws ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_normal_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 指定从服务器接收消息时的最大等待时间
                .setAwaitDuration(Duration.ofSeconds(5))
                .build();
        try {
   
            // 一次最多拉取10条,最大处理超时时间30s
            List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
   
                System.out.println(messageView);
                // 消费完成,提交ack
                try {
   
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
   
                    logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
                }
            });
        } catch (ClientException e) {
   
            logger.error("Failed to receive message", e);
        }
    }

}

SimpleConsumer 提供原子 API 操作来获取消息并提交消费结果。与 PushConsumer 相比,SimpleConsumer 提供了更好的灵活性。SimpleConsumer 适用于以下场景

  • 不可控的消息处理时长:如果消息处理时长不可估量,建议您使用 SimpleConsumer 来防止消息被处理过长时间。您可以在消息消费期间指定估计的消息处理时长。如果现有处理时长不适合您的业务场景,您可以调用相应的 API 操作更改消息处理时长。
  • 异步处理和批量消费:SimpleConsumer 不涉及 SDK 中的复杂线程封装。业务应用程序可以使用自定义设置。这样,SimpleConsumer 消费者就可以实现异步分发、批量消费和其他自定义场景。
  • 自定义消息消费速率:使用 SimpleConsumer 时,业务应用程序调用 ReceiveMessage 操作获取消息。您可以调整获取消息的频率以控制消息消费速率。

PullConsumer

自行查资料,官网资料也不多,感觉平时使用的比较少,大家自行掌握即可,我这里不过多叙述了。

6.总结

RocketMQ作为一款成熟的分布式消息中间件,在实战应用中需要注意:

  1. 合理设计Topic和Tag:业务隔离,便于管理
  2. 根据场景选择消息类型:普通消息、顺序消息、事务消息等
  3. 重视监控告警:及时发现消息堆积、消费延迟等问题
  4. 做好容灾预案:Broker宕机、网络分区等异常情况的处理方案
  5. 性能与可靠性平衡:根据业务需求调整刷盘方式、复制策略等

通过本教程的系统学习,相信您已经掌握了RocketMQ的核心概念、部署方式、API使用和高级特性。在实际项目中,建议结合具体业务场景灵活运用这些知识,并持续关注RocketMQ社区的动态和新特性。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
6月前
|
数据采集 机器学习/深度学习 运维
量化合约系统开发架构入门
量化合约系统核心在于数据、策略、风控与执行四大模块的协同,构建从数据到决策再到执行的闭环工作流。强调可追溯、可复现与可观测性,避免常见误区如重回测轻验证、忽视数据质量或滞后风控。初学者应以MVP为起点,结合回测框架与实时风控实践,逐步迭代。详见相关入门与实战资料。
|
消息中间件 大数据 关系型数据库
RocketMQ实战—3.基于RocketMQ升级订单系统架构
本文主要介绍了基于MQ实现订单系统核心流程的异步化改造、基于MQ实现订单系统和第三方系统的解耦、基于MQ实现将订单数据同步给大数据团队、秒杀系统的技术难点以及秒杀商详页的架构设计和基于MQ实现秒杀系统的异步化架构。
831 64
RocketMQ实战—3.基于RocketMQ升级订单系统架构
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
7月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2615 1
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
|
消息中间件 存储 Kafka
RocketMQ实战—4.消息零丢失的方案
本文分析了用户支付完成后未收到红包的问题,深入探讨了RocketMQ事务消息机制的实现原理及其在确保消息零丢失中的作用。首先,通过全链路分析发现消息可能在推送、存储或消费环节丢失。接着,介绍了RocketMQ事务消息机制如何通过half消息、本地事务执行及回调确认来保证消息发送成功,并详细解析了其底层原理,如half消息对消费者不可见、rollback与commit操作等。同时,对比了同步重试方案,指出其在复杂场景下的局限性。
RocketMQ实战—4.消息零丢失的方案

热门文章

最新文章