一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性

简介: 本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。

1.概述

之前我详解总结过Kafka相关知识点、也分享过RocketMQ是什么、以及两个消息队列组件的区别和各自优缺点,不清楚的可以看看之前我们发布的文章自行了解下,所以今天这里不再做消息队列使用场景之类的赘述了,本文将从入门到实践,全方位带你掌握RocketMQ的核心概念、部署方式、API使用和高级特性。

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

自诞生以来,Apache RocketMQ 凭借其简单的架构、丰富的业务功能和极高的可扩展性,已被企业开发人员和云供应商广泛采用。经过十多年的广泛场景打磨,RocketMQ 已成为金融级可靠业务消息的行业标准,广泛应用于互联网、大数据、移动互联网、物联网等领域。

2.核心概念

众所周知,RocketMQ是参考借鉴Kafka研发出来的,所以很多核心思想和概念几乎是一致的,所以我这里不会一一讲述相关知识概念,主要重点讲述RocketMQ独有的概念思想,在叙述之前先来看看RocketMQ的架构图,RocketMQ相关的组件都可以集群部署,比如说NameServerbroker生产者消费者等,broker集群还可以部署成一主多从,多主多从等模式,下面是一主多从的示例:

  • NameServer:可以认为它是一个轻量级注册中心,类似于zookeeper在kafka中的应用,但是zookeeper相对比较重,所以kafka都在去zookeeper化,NameServer比较轻量,主要用于对RocketMQ集群信息的管理,包brokertopicmessage queue路由信息`等等,可集群部署。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
  • Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息

  • 生产者:与Name Server集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳。

  • 消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。
  • 消费者组:消费者组是包含使用相同消费行为的消费者的负载均衡组。与作为运行实体的消费者不同,消费者组是逻辑资源。 RocketMQ 初始化消费者组中的多个消费者,以实现消费性能的扩展和高可用性灾难恢复。这和kafka中消费者组概念一样,主题topic被消费者组订阅,但实际上是消费者组中的消费者真正地消息消费。
  • 主题:主题在逻辑上是队列的集合;我们可以发布消息到主题或从主题接收消息。主题是消息的第一级分类
  • Tag:消息标签,消息的第二级分类。可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。消息标签是细粒度的消息分类属性,允许消息在主题级别以下进行细分。消费者通过订阅特定标签来实现消息过滤。这是RocketMQ特有的功能
  • 队列(message queue): 队列是RocketMQ 中用于存储和传输消息的容器,是消息存储的最小单位。RocketMQ 中的主题包含多个队列。这样队列支持水平分区和流式存储。 RocketMQ 的队列模型类似于 Kafka 的分区模型,虽然都是存储消息,但是实现逻辑、使用玩法都是不尽相同的,这在之前分享的文章中有详细总结,可自行查看
  • 消息:消息是RocketMQ 中数据传输的最小单位。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到RocketMQ 代理。然后,代理根据相关语义将消息传递给消费者。RocketMQ有以下几种消息类型:
    • 普通:普通消息不需要特殊语义,也不与其他普通消息相关联。
    • FIFO:RocketMQ 使用消息组来确定一组指定消息的顺序。消息按发送顺序传递。
    • 延迟:您可以指定延迟,使消息仅在延迟时间过去后才对消费者可用,而不是在生产时立即传递消息。
    • 事务:RocketMQ 支持分布式事务消息,并确保数据库更新和消息调用的事务一致性

后面三个概念都是RocketMQ中区别于kafka的不同逻辑知识点,需要重点关注下。

3.环境搭建

为了简单直接,快速上手,我们这里使用单机模式部署,首先要求服务器装有jdk1.8+环境,因为RocketMQ是Java开发的。

这里我按照官网教程安装当前最新版本RocketMQ5.2.0版本,先下载安装包rocketmq-all-5.2.0-bin-release.zip上传服务器,

解压安装包:

unzip rocketmq-all-5.2.0-bin-release.zip

查看安装包:

benchmark  bin  conf  lib  LICENSE  nohup.out  NOTICE  README.md

启动NameServer

nohup sh bin/mqnamesrv &

验证是否启动成功:

tail -f ~/logs/rocketmqlogs/namesrv.log

日志输出打印:

2025-06-05 15:56:10 INFO main - The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

启动 Broker 和 Proxy

ohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

验证是否启动成功:

tail -f ~/logs/rocketmqlogs/proxy.log

日志输出:

2025-06-05 16:06:42 INFO main - The broker[broker-a, 192.168.231.137:10911] boot success. serializeType=JSON and name server is localhost:9876

按照上面的流程,RocketMQ已经安装好,我们就可以开始发送、消费消息了,但是为了清楚直观,我们再安装一个RocketMQ 可视化控制台,教程:https://rocketmq.apache.ac.cn/docs/deploymentOperations/04Dashboard/,比较简单,这里碍于篇幅问题就不赘述了。同时关于集群模式部署教程,也请参考官网教程:https://rocketmq.apache.ac.cn/docs/deploymentOperations/01deploy

安装好之后访问路径:http://10.10.0.10:8021,这里我改成端口号为8021,防止默认的端口号8080冲突

4.集成客户端发送、消费消息

Java项目中添加依赖:当前最新版本5.0.7

  <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client-java</artifactId>
      <version>5.0.7</version>
  </dependency>

4.1 普通消息

普通消息是指在RocketMQ 中没有特殊功能的消息

在控制台先创建普调消息类型的主题topic_normal_test:

编写生产者发送消息代码:

public class ProducerNormalExample {
   

    private static final Logger logger = LoggerFactory.getLogger(ProducerNormalExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_normal_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 消息体
                .setBody("哈哈😄,这是shepherd发的第1条同步消息".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send message", e);
        }
        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

异步发送只需要调用异步发送方法即可:

    // 异步发送
    CompletableFuture<SendReceipt> future = producer.sendAsync(message);
    ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
    future.whenCompleteAsync((sendReceipt, throwable) -> {
   
        if (null != throwable) {
   
            logger.error("Failed to send message", throwable);
            return;
        }
        logger.info("Async send message successfully, messageId={}", sendReceipt.getMessageId());
    }, sendCallbackExecutor);

发送成功之后就能在控制台查看到消息了:

消费消息:

public class PushConsumerNormalExample {
   

    private static final Logger logger = LoggerFactory.getLogger(PushConsumerNormalExample.class);

    public static void main(String[] args) throws IOException, InterruptedException, ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_normal_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者组
                .setConsumerGroup(consumerGroup)
                // 设置过滤
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 消息监听器,消费消息逻辑所在
                .setMessageListener(messageView -> {
   
                    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // 睡眠,让消费者一直监听、消费消息
        Thread.sleep(Long.MAX_VALUE);
        // 消费者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//         pushConsumer.close();
    }

}

4.2 延迟消息

延迟消息是RocketMQ 中具有高级功能的消息,kafka中没有延迟消息,需要自己写代码实现,简单实现就是创建一个队列topic_delay_30m代表这个队列的消息是30m之后再执行,发送消息的同时写入发送时间,然后通过高频的定时扫描轮训比较消息的发送时间和当前时间是否差值超过了30m,超过就消费消息,听上去是不是挺麻烦,所以RocketMQ就原生提供延迟消息这一功能。

像上面一样先去控制台创建一个延迟消息类型的主题topic_delay_test,这里就不截图展示了

生产者发送延迟消息:

public class ProducerDelayExample {
   

    private static final Logger logger = LoggerFactory.getLogger(ProducerDelayExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_delay_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 设置延迟执行时间
                .setDeliveryTimestamp(System.currentTimeMillis() + 3 * 60 * 1000)
                // 消息体
                .setBody("我要延迟3分钟之后再执行".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send delay message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send delay message", e);
        }

        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

这里我们指定3分钟之后执行,注意这里并不像上面的普通消息一样,生产者发送之后在控制台立即可见,需要等待3分钟之后才能可见。也就是消息发送到服务器端,消息是存储在基于时间的存储系统中,直到指定的传递时间,消息不会立即创建索引。等到了指定的时间,消息才被写入常规存储引擎,消息对消费者可见,并等待消费者消费。

消费者消费消息:

public class PushConsumerDelayExample {
   
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerDelayExample.class);

    public static void main(String[] args) throws IOException, InterruptedException, ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_delay_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者组
                .setConsumerGroup(consumerGroup)
                // 设置过滤
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 消息监听器,消费消息逻辑所在
                .setMessageListener(messageView -> {
   
                    logger.info("Consume delay message successfully, messageView={}", messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // 睡眠,让消费者一直监听、消费消息
        Thread.sleep(Long.MAX_VALUE);
        // 消费者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//         pushConsumer.close();
    }
}

消费消息代码逻辑都一样,所以下面碍于篇幅问题,就不再展示消费代码了

4.3 顺序消息

顺序消息是RocketMQ 中的一种高级消息类型。顺序消息按消息发送的顺序传递给消费者。这种消息类型允许您在业务场景中实现有序处理。RocketMQ 使用消息组来确定顺序消息的顺序。您必须为顺序消息配置消息组。消息组中的消息按先进先出 (FIFO) 顺序处理。消息排序不适用于不同的消息组或不在消息组中的消息。属于同一消息组的消息将按发送顺序存储在同一队列中。比如说可以使用用户 ID 作为消息组关键字来实现同一用户的消息有序处理

RocketMQ 确保消费顺序与队列中的存储顺序相同,也就是只保证在队列级别顺序消费消息,这和kafka只保证在同一个分区内顺序消费思想是一样的,只是kafka通过路由key将消息存储到同一分区,而RocketMQ使用消息组来实现消息存储在同一队列中,一个套路~~~

同样地先去控制台创建一个fifo消息类型的主题topic_fifo_test

生产者生产消息:

public class ProducerFifoExample {
   
    private static final Logger logger = LoggerFactory.getLogger(ProducerFifoExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_fifo_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 消息组 同一消息组的消息存储在同一队列中,队列内消息保证顺序消费 这里模拟用户id分组来保证同一用户消息顺序消费
                .setMessageGroup("userId::1")
                // 消息体
                .setBody("顺序消息001".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send message", e);
        }

        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

消费者代码如上面小结所示。

4.4 事务消息

事务消息是RocketMQ 提供的一种高级消息类型,用于确保消息生产与本地事务之间的最终一致性。RocketMQ 的事务消息功能将两阶段提交与本地事务相结合,实现提交结果的全局一致性。

控制台创建一个事务消息类型的主题topic_transaction_test

生产者提交事务消息:

public class ProducerTransactionExample {
   
    private static final Logger logger = LoggerFactory.getLogger(ProducerTransactionExample.class);

    // 模拟校验订单交易是否已提交
    private static boolean checkOrderById(String orderId) {
   
        return true;
    }
    // 模拟本地事务的执行结果
    private static boolean doLocalTransaction() {
   
        return true;
    }
    public static void main(String[] args) throws ClientException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_transaction_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建一个事务生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                // 指定事务检查器
                .setTransactionChecker(messageView -> {
   
                    // 获取订单id
                    final String orderId = messageView.getProperties().get("OrderId");
                    if (Strings.isNullOrEmpty(orderId)) {
   
                        // 没有获取到,直接回滚,不提交发送消息
                        return TransactionResolution.ROLLBACK;
                    }
                    // 查询订单交易已提交
                    return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                })
                .build();
        // Create a transaction branch.
        final Transaction transaction;
        try {
   
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
   
            logger.error("error", e);
            // If the transaction branch fails to be created, the transaction is terminated.
            return;
        }
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("trx-1")
                // 标签
                .setTag("trx-A")
                // 事务消息唯一id,通过该id关联查询本地事务状态
                .addProperty("OrderId", "001")
                // Message body.
                .setBody("这是一条事务消息".getBytes())
                .build();

        try {
   
            // Send a half message.
            final SendReceipt sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
   
            return;
        }
        /**
         * Execute the local transaction and check the execution result.
         * 1. If the result is Commit, deliver the message.
         * 2. If the result is Rollback, roll back the message.
         * 3. If an unknown exception occurs, no action is performed until a response is obtained from a half message status query.
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
   
            try {
   
                transaction.commit();
            } catch (ClientException e) {
   
                logger.error("error", e);
            }
        } else {
   
            try {
   
                transaction.rollback();
            } catch (ClientException e) {
   
                logger.error("error", e);
            }
        }
    }

}

这里模拟的是用户提交订单支付之后,发送订单消息给下游服务做处理,这是一个典型的分布式事务场景。

5.消费者类型

RocketMQ 支持以下类型的消费者:PushConsumerSimpleConsumerPullConsumer。这三种消费者类型具有不同的集成和控制方法,您可以使用它们来满足不同业务场景下的消息需求。

注意:在同一个消费者组中混合使用 PullConsumer 和其他消费者类型是严格禁止的

项目 PushConsumer SimpleConsumer PullConsumer
API 操作调用 使用消息监听器调用回调操作以返回消费结果。消费者只能在消息监听器的范围内处理消费逻辑。 业务应用程序实现消息处理并调用相应的操作以返回消费结果。 业务应用程序实现消息拉取和处理,并调用相应的操作以返回消费结果。
消费并发管理 Apache RocketMQ SDK 用于管理消息消费的并发线程数。 用于消息消费的并发线程数基于各个业务应用程序的消费逻辑。 用于消息消费的并发线程数基于各个业务应用程序的消费逻辑。
负载均衡机制 5.0 版本中的基于消息的负载均衡,早期版本中的基于队列的负载均衡。 基于消息的负载均衡。 基于队列的负载均衡。
API 灵活性 API 操作被封装,灵活性较差。 原子操作提供了极大的灵活性。 原子操作提供了极大的灵活性。
场景 此消费者类型适用于不需要自定义流程的开发场景。 此消费者类型适用于需要自定义流程的开发场景。 建议仅在流处理框架场景中进行集成

PushConsumer

PushConsumer 是一种高度封装的消费者类型。消息消费和消费结果提交仅通过消息监听器进行处理。消息获取、消费状态提交和消费重试由RocketMQ 客户端 SDK 完成。一开始我差点以为它是一条一条消息的消费,这样消费也太慢了吧?设计这个消费者意义不大呀

// 消息监听器,消费消息逻辑所在
.setMessageListener(messageView -> {
   
    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
    try {
   
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
   
        throw new RuntimeException(e);
    }
    return ConsumeResult.SUCCESS;
})

这里我模拟消费处理一条消息需要5s,看看消费两条消息:

common-demo] [] [2025-06-06 15:11:22.004] [INFO] [RocketmqMessageConsumption-0-18@PID_IS_UNDEFINED] com.shepherd.basedemo.rocketmq.PushConsumerNormalExample lambda$main$0: Consume message successfully, messageId=01FE46DDDDBE846F1408542E5500000000
[common-demo] [] [2025-06-06 15:11:22.005] [INFO] [RocketmqMessageConsumption-0-17@PID_IS_UNDEFINED] com.shepherd.basedemo.rocketmq.PushConsumerNormalExample lambda$main$0: Consume message successfully, messageId=01FE46DDDDBE846E8608542E4000000000

看的出来是单独的线程分别消费了消息。

其实对于 PushConsumer,实时消息处理基于 SDK 的典型 Reactor 线程模型。SDK 具有内置的长轮询线程,该线程拉取消息并将消息存储到队列中。然后,消息从队列传递到各个消息消费线程。消息监听器根据消息消费逻辑进行操作。下图显示了 PushConsumer 消费者的消息消费过程。

对于 RocketMQ 中的 fifo 消息,如果为消费者组配置了有序消息消费,PushConsumer 消费者会按消费顺序消费消息。当 PushConsumer 消费者消费消息时,无需在业务逻辑中定义消费顺序,即可确保消费顺序。

SimpleConsumer

SimpleConsumer 是一种支持消息处理原子操作的消费者类型。这种类型的消费者根据业务逻辑调用操作来获取消息、提交消费状态和执行消息重试。SimpleConsumer 涉及多个 API 操作。根据需要调用相应的操作来获取消息并将消息分发到业务线程进行处理。然后,调用提交操作以提交消息处理结果:

public class SimpleConsumerExample {
   

    private static final Logger logger = LoggerFactory.getLogger(SimpleConsumerExample.class);

    public static void main(String[] args) throws ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_normal_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 指定从服务器接收消息时的最大等待时间
                .setAwaitDuration(Duration.ofSeconds(5))
                .build();
        try {
   
            // 一次最多拉取10条,最大处理超时时间30s
            List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
   
                System.out.println(messageView);
                // 消费完成,提交ack
                try {
   
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
   
                    logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
                }
            });
        } catch (ClientException e) {
   
            logger.error("Failed to receive message", e);
        }
    }

}

SimpleConsumer 提供原子 API 操作来获取消息并提交消费结果。与 PushConsumer 相比,SimpleConsumer 提供了更好的灵活性。SimpleConsumer 适用于以下场景

  • 不可控的消息处理时长:如果消息处理时长不可估量,建议您使用 SimpleConsumer 来防止消息被处理过长时间。您可以在消息消费期间指定估计的消息处理时长。如果现有处理时长不适合您的业务场景,您可以调用相应的 API 操作更改消息处理时长。
  • 异步处理和批量消费:SimpleConsumer 不涉及 SDK 中的复杂线程封装。业务应用程序可以使用自定义设置。这样,SimpleConsumer 消费者就可以实现异步分发、批量消费和其他自定义场景。
  • 自定义消息消费速率:使用 SimpleConsumer 时,业务应用程序调用 ReceiveMessage 操作获取消息。您可以调整获取消息的频率以控制消息消费速率。

PullConsumer

自行查资料,官网资料也不多,感觉平时使用的比较少,大家自行掌握即可,我这里不过多叙述了。

6.总结

RocketMQ作为一款成熟的分布式消息中间件,在实战应用中需要注意:

  1. 合理设计Topic和Tag:业务隔离,便于管理
  2. 根据场景选择消息类型:普通消息、顺序消息、事务消息等
  3. 重视监控告警:及时发现消息堆积、消费延迟等问题
  4. 做好容灾预案:Broker宕机、网络分区等异常情况的处理方案
  5. 性能与可靠性平衡:根据业务需求调整刷盘方式、复制策略等

通过本教程的系统学习,相信您已经掌握了RocketMQ的核心概念、部署方式、API使用和高级特性。在实际项目中,建议结合具体业务场景灵活运用这些知识,并持续关注RocketMQ社区的动态和新特性。

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
24天前
|
人工智能 JavaScript 前端开发
GenSX (不一样的AI应用框架)架构学习指南
GenSX 是一个基于 TypeScript 的函数式 AI 工作流框架,以“函数组合替代图编排”为核心理念。它通过纯函数组件、自动追踪与断点恢复等特性,让开发者用自然代码构建可追溯、易测试的 LLM 应用。支持多模型集成与插件化扩展,兼具灵活性与工程化优势。
104 6
|
2月前
|
人工智能 Cloud Native 中间件
划重点|云栖大会「AI 原生应用架构论坛」看点梳理
本场论坛将系统性阐述 AI 原生应用架构的新范式、演进趋势与技术突破,并分享来自真实生产环境下的一线实践经验与思考。
|
2月前
|
机器学习/深度学习 人工智能 vr&ar
H4H:面向AR/VR应用的NPU-CIM异构系统混合卷积-Transformer架构搜索——论文阅读
H4H是一种面向AR/VR应用的混合卷积-Transformer架构,基于NPU-CIM异构系统,通过神经架构搜索实现高效模型设计。该架构结合卷积神经网络(CNN)的局部特征提取与视觉Transformer(ViT)的全局信息处理能力,提升模型性能与效率。通过两阶段增量训练策略,缓解混合模型训练中的梯度冲突问题,并利用异构计算资源优化推理延迟与能耗。实验表明,H4H在相同准确率下显著降低延迟和功耗,为AR/VR设备上的边缘AI推理提供了高效解决方案。
290 0
|
30天前
|
机器学习/深度学习 自然语言处理 算法
48_动态架构模型:NAS在LLM中的应用
大型语言模型(LLM)在自然语言处理领域的突破性进展,很大程度上归功于其庞大的参数量和复杂的网络架构。然而,随着模型规模的不断增长,计算资源消耗、推理延迟和部署成本等问题日益凸显。如何在保持模型性能的同时,优化模型架构以提高效率,成为2025年大模型研究的核心方向之一。神经架构搜索(Neural Architecture Search, NAS)作为一种自动化的网络设计方法,正在为这一挑战提供创新性解决方案。本文将深入探讨NAS技术如何应用于LLM的架构优化,特别是在层数与维度调整方面的最新进展,并通过代码实现展示简单的NAS实验。
|
4月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
170 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
816 103
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
329 117
|
12月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
191 1

热门文章

最新文章

下一篇
开通oss服务