一文带你从入门到实战全面掌握RocketMQ核心概念、架构部署、实践应用和高级特性

简介: 本文详细介绍了分布式消息中间件RocketMQ的核心概念、部署方式及使用方法。RocketMQ由阿里研发并开源,具有高性能、高可靠性和分布式特性,广泛应用于金融、互联网等领域。文章从环境搭建到消息类型的实战(普通消息、延迟消息、顺序消息和事务消息)进行了全面解析,并对比了三种消费者类型(PushConsumer、SimpleConsumer和PullConsumer)的特点与适用场景。最后总结了使用RocketMQ时的关键注意事项,如Topic和Tag的设计、监控告警的重要性以及性能与可靠性的平衡。通过学习本文,读者可掌握RocketMQ的使用精髓并灵活应用于实际项目中。

1.概述

之前我详解总结过Kafka相关知识点、也分享过RocketMQ是什么、以及两个消息队列组件的区别和各自优缺点,不清楚的可以看看之前我们发布的文章自行了解下,所以今天这里不再做消息队列使用场景之类的赘述了,本文将从入门到实践,全方位带你掌握RocketMQ的核心概念、部署方式、API使用和高级特性。

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。

自诞生以来,Apache RocketMQ 凭借其简单的架构、丰富的业务功能和极高的可扩展性,已被企业开发人员和云供应商广泛采用。经过十多年的广泛场景打磨,RocketMQ 已成为金融级可靠业务消息的行业标准,广泛应用于互联网、大数据、移动互联网、物联网等领域。

2.核心概念

众所周知,RocketMQ是参考借鉴Kafka研发出来的,所以很多核心思想和概念几乎是一致的,所以我这里不会一一讲述相关知识概念,主要重点讲述RocketMQ独有的概念思想,在叙述之前先来看看RocketMQ的架构图,RocketMQ相关的组件都可以集群部署,比如说NameServerbroker生产者消费者等,broker集群还可以部署成一主多从,多主多从等模式,下面是一主多从的示例:

  • NameServer:可以认为它是一个轻量级注册中心,类似于zookeeper在kafka中的应用,但是zookeeper相对比较重,所以kafka都在去zookeeper化,NameServer比较轻量,主要用于对RocketMQ集群信息的管理,包brokertopicmessage queue路由信息`等等,可集群部署。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
  • Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己注册至Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息

  • 生产者:与Name Server集群中的其中一个节点(随机)建立长连接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳。

  • 消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。
  • 消费者组:消费者组是包含使用相同消费行为的消费者的负载均衡组。与作为运行实体的消费者不同,消费者组是逻辑资源。 RocketMQ 初始化消费者组中的多个消费者,以实现消费性能的扩展和高可用性灾难恢复。这和kafka中消费者组概念一样,主题topic被消费者组订阅,但实际上是消费者组中的消费者真正地消息消费。
  • 主题:主题在逻辑上是队列的集合;我们可以发布消息到主题或从主题接收消息。主题是消息的第一级分类
  • Tag:消息标签,消息的第二级分类。可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。消息标签是细粒度的消息分类属性,允许消息在主题级别以下进行细分。消费者通过订阅特定标签来实现消息过滤。这是RocketMQ特有的功能
  • 队列(message queue): 队列是RocketMQ 中用于存储和传输消息的容器,是消息存储的最小单位。RocketMQ 中的主题包含多个队列。这样队列支持水平分区和流式存储。 RocketMQ 的队列模型类似于 Kafka 的分区模型,虽然都是存储消息,但是实现逻辑、使用玩法都是不尽相同的,这在之前分享的文章中有详细总结,可自行查看
  • 消息:消息是RocketMQ 中数据传输的最小单位。生产者将业务数据的负载和扩展属性封装成消息,并将消息发送到RocketMQ 代理。然后,代理根据相关语义将消息传递给消费者。RocketMQ有以下几种消息类型:
    • 普通:普通消息不需要特殊语义,也不与其他普通消息相关联。
    • FIFO:RocketMQ 使用消息组来确定一组指定消息的顺序。消息按发送顺序传递。
    • 延迟:您可以指定延迟,使消息仅在延迟时间过去后才对消费者可用,而不是在生产时立即传递消息。
    • 事务:RocketMQ 支持分布式事务消息,并确保数据库更新和消息调用的事务一致性

后面三个概念都是RocketMQ中区别于kafka的不同逻辑知识点,需要重点关注下。

3.环境搭建

为了简单直接,快速上手,我们这里使用单机模式部署,首先要求服务器装有jdk1.8+环境,因为RocketMQ是Java开发的。

这里我按照官网教程安装当前最新版本RocketMQ5.2.0版本,先下载安装包rocketmq-all-5.2.0-bin-release.zip上传服务器,

解压安装包:

unzip rocketmq-all-5.2.0-bin-release.zip

查看安装包:

benchmark  bin  conf  lib  LICENSE  nohup.out  NOTICE  README.md

启动NameServer

nohup sh bin/mqnamesrv &

验证是否启动成功:

tail -f ~/logs/rocketmqlogs/namesrv.log

日志输出打印:

2025-06-05 15:56:10 INFO main - The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

启动 Broker 和 Proxy

ohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

验证是否启动成功:

tail -f ~/logs/rocketmqlogs/proxy.log

日志输出:

2025-06-05 16:06:42 INFO main - The broker[broker-a, 192.168.231.137:10911] boot success. serializeType=JSON and name server is localhost:9876

按照上面的流程,RocketMQ已经安装好,我们就可以开始发送、消费消息了,但是为了清楚直观,我们再安装一个RocketMQ 可视化控制台,教程:https://rocketmq.apache.ac.cn/docs/deploymentOperations/04Dashboard/,比较简单,这里碍于篇幅问题就不赘述了。同时关于集群模式部署教程,也请参考官网教程:https://rocketmq.apache.ac.cn/docs/deploymentOperations/01deploy

安装好之后访问路径:http://10.10.0.10:8021,这里我改成端口号为8021,防止默认的端口号8080冲突

4.集成客户端发送、消费消息

Java项目中添加依赖:当前最新版本5.0.7

  <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client-java</artifactId>
      <version>5.0.7</version>
  </dependency>

4.1 普通消息

普通消息是指在RocketMQ 中没有特殊功能的消息

在控制台先创建普调消息类型的主题topic_normal_test:

编写生产者发送消息代码:

public class ProducerNormalExample {
   

    private static final Logger logger = LoggerFactory.getLogger(ProducerNormalExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_normal_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 消息体
                .setBody("哈哈😄,这是shepherd发的第1条同步消息".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send message", e);
        }
        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

异步发送只需要调用异步发送方法即可:

    // 异步发送
    CompletableFuture<SendReceipt> future = producer.sendAsync(message);
    ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
    future.whenCompleteAsync((sendReceipt, throwable) -> {
   
        if (null != throwable) {
   
            logger.error("Failed to send message", throwable);
            return;
        }
        logger.info("Async send message successfully, messageId={}", sendReceipt.getMessageId());
    }, sendCallbackExecutor);

发送成功之后就能在控制台查看到消息了:

消费消息:

public class PushConsumerNormalExample {
   

    private static final Logger logger = LoggerFactory.getLogger(PushConsumerNormalExample.class);

    public static void main(String[] args) throws IOException, InterruptedException, ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_normal_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者组
                .setConsumerGroup(consumerGroup)
                // 设置过滤
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 消息监听器,消费消息逻辑所在
                .setMessageListener(messageView -> {
   
                    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // 睡眠,让消费者一直监听、消费消息
        Thread.sleep(Long.MAX_VALUE);
        // 消费者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//         pushConsumer.close();
    }

}

4.2 延迟消息

延迟消息是RocketMQ 中具有高级功能的消息,kafka中没有延迟消息,需要自己写代码实现,简单实现就是创建一个队列topic_delay_30m代表这个队列的消息是30m之后再执行,发送消息的同时写入发送时间,然后通过高频的定时扫描轮训比较消息的发送时间和当前时间是否差值超过了30m,超过就消费消息,听上去是不是挺麻烦,所以RocketMQ就原生提供延迟消息这一功能。

像上面一样先去控制台创建一个延迟消息类型的主题topic_delay_test,这里就不截图展示了

生产者发送延迟消息:

public class ProducerDelayExample {
   

    private static final Logger logger = LoggerFactory.getLogger(ProducerDelayExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_delay_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 设置延迟执行时间
                .setDeliveryTimestamp(System.currentTimeMillis() + 3 * 60 * 1000)
                // 消息体
                .setBody("我要延迟3分钟之后再执行".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send delay message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send delay message", e);
        }

        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

这里我们指定3分钟之后执行,注意这里并不像上面的普通消息一样,生产者发送之后在控制台立即可见,需要等待3分钟之后才能可见。也就是消息发送到服务器端,消息是存储在基于时间的存储系统中,直到指定的传递时间,消息不会立即创建索引。等到了指定的时间,消息才被写入常规存储引擎,消息对消费者可见,并等待消费者消费。

消费者消费消息:

public class PushConsumerDelayExample {
   
    private static final Logger logger = LoggerFactory.getLogger(PushConsumerDelayExample.class);

    public static void main(String[] args) throws IOException, InterruptedException, ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_delay_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // 设置消费者组
                .setConsumerGroup(consumerGroup)
                // 设置过滤
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 消息监听器,消费消息逻辑所在
                .setMessageListener(messageView -> {
   
                    logger.info("Consume delay message successfully, messageView={}", messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        // 睡眠,让消费者一直监听、消费消息
        Thread.sleep(Long.MAX_VALUE);
        // 消费者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//         pushConsumer.close();
    }
}

消费消息代码逻辑都一样,所以下面碍于篇幅问题,就不再展示消费代码了

4.3 顺序消息

顺序消息是RocketMQ 中的一种高级消息类型。顺序消息按消息发送的顺序传递给消费者。这种消息类型允许您在业务场景中实现有序处理。RocketMQ 使用消息组来确定顺序消息的顺序。您必须为顺序消息配置消息组。消息组中的消息按先进先出 (FIFO) 顺序处理。消息排序不适用于不同的消息组或不在消息组中的消息。属于同一消息组的消息将按发送顺序存储在同一队列中。比如说可以使用用户 ID 作为消息组关键字来实现同一用户的消息有序处理

RocketMQ 确保消费顺序与队列中的存储顺序相同,也就是只保证在队列级别顺序消费消息,这和kafka只保证在同一个分区内顺序消费思想是一样的,只是kafka通过路由key将消息存储到同一分区,而RocketMQ使用消息组来实现消息存储在同一队列中,一个套路~~~

同样地先去控制台创建一个fifo消息类型的主题topic_fifo_test

生产者生产消息:

public class ProducerFifoExample {
   
    private static final Logger logger = LoggerFactory.getLogger(ProducerFifoExample.class);

    public static void main(String[] args) throws ClientException, IOException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_fifo_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                .build();
        // 构建消息
        Message message = provider.newMessageBuilder()
                // 设置消息发送到的主题
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("key-1")
                // 标签,消息的二级分类,消费者可以标签过滤消费特定消息
                .setTag("tag-A")
                // 消息组 同一消息组的消息存储在同一队列中,队列内消息保证顺序消费 这里模拟用户id分组来保证同一用户消息顺序消费
                .setMessageGroup("userId::1")
                // 消息体
                .setBody("顺序消息001".getBytes())
                .build();
        try {
   
            // 同步发送
            SendReceipt sendReceipt = producer.send(message);
            // 打印消息id
            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
   
            logger.error("Failed to send message", e);
        }

        // 生产者是底层资源,可以像数据库的连接池一样重用, 不能频繁的创建或者销毁
        // 你可以在不再需要它的时候销毁它,也可以让它随着jvm一起销毁
//        producer.close();
    }
}

消费者代码如上面小结所示。

4.4 事务消息

事务消息是RocketMQ 提供的一种高级消息类型,用于确保消息生产与本地事务之间的最终一致性。RocketMQ 的事务消息功能将两阶段提交与本地事务相结合,实现提交结果的全局一致性。

控制台创建一个事务消息类型的主题topic_transaction_test

生产者提交事务消息:

public class ProducerTransactionExample {
   
    private static final Logger logger = LoggerFactory.getLogger(ProducerTransactionExample.class);

    // 模拟校验订单交易是否已提交
    private static boolean checkOrderById(String orderId) {
   
        return true;
    }
    // 模拟本地事务的执行结果
    private static boolean doLocalTransaction() {
   
        return true;
    }
    public static void main(String[] args) throws ClientException {
   
        // 代理地址
        String endpoint = "10.10.0.10:8081";
        // 主题名称 主题需要提前创建
        String topic = "topic_transaction_test";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();
        // 创建一个事务生产者
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(configuration)
                // 指定事务检查器
                .setTransactionChecker(messageView -> {
   
                    // 获取订单id
                    final String orderId = messageView.getProperties().get("OrderId");
                    if (Strings.isNullOrEmpty(orderId)) {
   
                        // 没有获取到,直接回滚,不提交发送消息
                        return TransactionResolution.ROLLBACK;
                    }
                    // 查询订单交易已提交
                    return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
                })
                .build();
        // Create a transaction branch.
        final Transaction transaction;
        try {
   
            transaction = producer.beginTransaction();
        } catch (ClientException e) {
   
            logger.error("error", e);
            // If the transaction branch fails to be created, the transaction is terminated.
            return;
        }
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // 消息key,方便你后续根据消息key快速查找和追踪消息,一般用消息体里面的主键 比如说订单id或者用户id
                .setKeys("trx-1")
                // 标签
                .setTag("trx-A")
                // 事务消息唯一id,通过该id关联查询本地事务状态
                .addProperty("OrderId", "001")
                // Message body.
                .setBody("这是一条事务消息".getBytes())
                .build();

        try {
   
            // Send a half message.
            final SendReceipt sendReceipt = producer.send(message, transaction);
        } catch (ClientException e) {
   
            return;
        }
        /**
         * Execute the local transaction and check the execution result.
         * 1. If the result is Commit, deliver the message.
         * 2. If the result is Rollback, roll back the message.
         * 3. If an unknown exception occurs, no action is performed until a response is obtained from a half message status query.
         *
         */
        boolean localTransactionOk = doLocalTransaction();
        if (localTransactionOk) {
   
            try {
   
                transaction.commit();
            } catch (ClientException e) {
   
                logger.error("error", e);
            }
        } else {
   
            try {
   
                transaction.rollback();
            } catch (ClientException e) {
   
                logger.error("error", e);
            }
        }
    }

}

这里模拟的是用户提交订单支付之后,发送订单消息给下游服务做处理,这是一个典型的分布式事务场景。

5.消费者类型

RocketMQ 支持以下类型的消费者:PushConsumerSimpleConsumerPullConsumer。这三种消费者类型具有不同的集成和控制方法,您可以使用它们来满足不同业务场景下的消息需求。

注意:在同一个消费者组中混合使用 PullConsumer 和其他消费者类型是严格禁止的

项目 PushConsumer SimpleConsumer PullConsumer
API 操作调用 使用消息监听器调用回调操作以返回消费结果。消费者只能在消息监听器的范围内处理消费逻辑。 业务应用程序实现消息处理并调用相应的操作以返回消费结果。 业务应用程序实现消息拉取和处理,并调用相应的操作以返回消费结果。
消费并发管理 Apache RocketMQ SDK 用于管理消息消费的并发线程数。 用于消息消费的并发线程数基于各个业务应用程序的消费逻辑。 用于消息消费的并发线程数基于各个业务应用程序的消费逻辑。
负载均衡机制 5.0 版本中的基于消息的负载均衡,早期版本中的基于队列的负载均衡。 基于消息的负载均衡。 基于队列的负载均衡。
API 灵活性 API 操作被封装,灵活性较差。 原子操作提供了极大的灵活性。 原子操作提供了极大的灵活性。
场景 此消费者类型适用于不需要自定义流程的开发场景。 此消费者类型适用于需要自定义流程的开发场景。 建议仅在流处理框架场景中进行集成

PushConsumer

PushConsumer 是一种高度封装的消费者类型。消息消费和消费结果提交仅通过消息监听器进行处理。消息获取、消费状态提交和消费重试由RocketMQ 客户端 SDK 完成。一开始我差点以为它是一条一条消息的消费,这样消费也太慢了吧?设计这个消费者意义不大呀

// 消息监听器,消费消息逻辑所在
.setMessageListener(messageView -> {
   
    logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
    try {
   
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
   
        throw new RuntimeException(e);
    }
    return ConsumeResult.SUCCESS;
})

这里我模拟消费处理一条消息需要5s,看看消费两条消息:

common-demo] [] [2025-06-06 15:11:22.004] [INFO] [RocketmqMessageConsumption-0-18@PID_IS_UNDEFINED] com.shepherd.basedemo.rocketmq.PushConsumerNormalExample lambda$main$0: Consume message successfully, messageId=01FE46DDDDBE846F1408542E5500000000
[common-demo] [] [2025-06-06 15:11:22.005] [INFO] [RocketmqMessageConsumption-0-17@PID_IS_UNDEFINED] com.shepherd.basedemo.rocketmq.PushConsumerNormalExample lambda$main$0: Consume message successfully, messageId=01FE46DDDDBE846E8608542E4000000000

看的出来是单独的线程分别消费了消息。

其实对于 PushConsumer,实时消息处理基于 SDK 的典型 Reactor 线程模型。SDK 具有内置的长轮询线程,该线程拉取消息并将消息存储到队列中。然后,消息从队列传递到各个消息消费线程。消息监听器根据消息消费逻辑进行操作。下图显示了 PushConsumer 消费者的消息消费过程。

对于 RocketMQ 中的 fifo 消息,如果为消费者组配置了有序消息消费,PushConsumer 消费者会按消费顺序消费消息。当 PushConsumer 消费者消费消息时,无需在业务逻辑中定义消费顺序,即可确保消费顺序。

SimpleConsumer

SimpleConsumer 是一种支持消息处理原子操作的消费者类型。这种类型的消费者根据业务逻辑调用操作来获取消息、提交消费状态和执行消息重试。SimpleConsumer 涉及多个 API 操作。根据需要调用相应的操作来获取消息并将消息分发到业务线程进行处理。然后,调用提交操作以提交消息处理结果:

public class SimpleConsumerExample {
   

    private static final Logger logger = LoggerFactory.getLogger(SimpleConsumerExample.class);

    public static void main(String[] args) throws ClientException {
   
        // 代理地址
        String endpoints = "10.10.0.10:8081";
        // 消费者组
        String consumerGroup = "shepherd-consumer-group-01";
        // 消费的主题名称
        String topic = "topic_normal_test";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 客户端配置
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        // 根据tag过滤消费消息  *代表消费所有
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // 指定从服务器接收消息时的最大等待时间
                .setAwaitDuration(Duration.ofSeconds(5))
                .build();
        try {
   
            // 一次最多拉取10条,最大处理超时时间30s
            List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
   
                System.out.println(messageView);
                // 消费完成,提交ack
                try {
   
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
   
                    logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
                }
            });
        } catch (ClientException e) {
   
            logger.error("Failed to receive message", e);
        }
    }

}

SimpleConsumer 提供原子 API 操作来获取消息并提交消费结果。与 PushConsumer 相比,SimpleConsumer 提供了更好的灵活性。SimpleConsumer 适用于以下场景

  • 不可控的消息处理时长:如果消息处理时长不可估量,建议您使用 SimpleConsumer 来防止消息被处理过长时间。您可以在消息消费期间指定估计的消息处理时长。如果现有处理时长不适合您的业务场景,您可以调用相应的 API 操作更改消息处理时长。
  • 异步处理和批量消费:SimpleConsumer 不涉及 SDK 中的复杂线程封装。业务应用程序可以使用自定义设置。这样,SimpleConsumer 消费者就可以实现异步分发、批量消费和其他自定义场景。
  • 自定义消息消费速率:使用 SimpleConsumer 时,业务应用程序调用 ReceiveMessage 操作获取消息。您可以调整获取消息的频率以控制消息消费速率。

PullConsumer

自行查资料,官网资料也不多,感觉平时使用的比较少,大家自行掌握即可,我这里不过多叙述了。

6.总结

RocketMQ作为一款成熟的分布式消息中间件,在实战应用中需要注意:

  1. 合理设计Topic和Tag:业务隔离,便于管理
  2. 根据场景选择消息类型:普通消息、顺序消息、事务消息等
  3. 重视监控告警:及时发现消息堆积、消费延迟等问题
  4. 做好容灾预案:Broker宕机、网络分区等异常情况的处理方案
  5. 性能与可靠性平衡:根据业务需求调整刷盘方式、复制策略等

通过本教程的系统学习,相信您已经掌握了RocketMQ的核心概念、部署方式、API使用和高级特性。在实际项目中,建议结合具体业务场景灵活运用这些知识,并持续关注RocketMQ社区的动态和新特性。

相关实践学习
5分钟轻松打造应对流量洪峰的稳定商城交易系统
本实验通过SAE极速部署一个微服务电商商城,同时结合RocketMQ异步解耦、削峰填谷的能力,带大家体验面对流量洪峰仍旧稳定可靠的商城交易系统!
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
人工智能 监控 安全
NTP网络子钟的技术架构与行业应用解析
在数字化与智能化时代,时间同步精度至关重要。西安同步电子科技有限公司专注时间频率领域,以“同步天下”品牌提供可靠解决方案。其明星产品SYN6109型NTP网络子钟基于网络时间协议,实现高精度时间同步,广泛应用于考场、医院、智慧场景等领域。公司坚持技术创新,产品通过权威认证,未来将结合5G、物联网等技术推动行业进步,引领精准时间管理新时代。
|
9天前
|
物联网 Linux 开发者
快速部署自己私有MQTT-Broker-下载安装到运行不到一分钟,快速简单且易于集成到自己项目中
本文给物联网开发的朋友推荐的是GMQT,让物联网开发者快速拥有合适自己的MQTT-Broker,本文从下载程序到安装部署手把手教大家安装用上私有化MQTT服务器。
139 5
|
26天前
|
消息中间件 监控 Docker
Docker环境下快速部署RabbitMQ教程。
就这样,你成功地用魔法召唤出了RabbitMQ,还把它和你的应用程序连接了起来。现在,消息会像小溪流水一样,在你的系统中自由流淌。别忘了,兔子们不喜欢孤独,他们需要你细心的关怀,不时地监控它们,确保他们的世界运转得井井有条。
116 18
|
2月前
|
机器学习/深度学习 算法 测试技术
图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析
本文探讨了基于图的重排序方法在信息检索领域的应用与前景。传统两阶段检索架构中,初始检索速度快但结果可能含噪声,重排序阶段通过强大语言模型提升精度,但仍面临复杂需求挑战
91 0
图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析
|
2月前
|
缓存 算法 网络协议
IP代理技术原理深度解析:从基础架构到应用实践
IP代理是网络通信中的关键技术,通过构建中间层实现请求转发与信息过滤。其核心价值体现在身份伪装、访问控制和性能优化三个方面。文章详细解析了HTTP与SOCKS协议的工作机制,探讨了代理服务器从传统单线程到分布式集群的技术演进,并分析了在网络爬虫、跨境电商及企业安全等场景的应用。同时,面对协议识别、性能瓶颈和隐私合规等挑战,提出了多种解决方案。未来,IP代理将融合边缘计算、AI驱动优化及量子安全加密等趋势,持续发展为支撑现代互联网的重要基础设施。
169 2
|
1月前
|
消息中间件 存储 大数据
阿里云消息队列 Kafka 架构及典型应用场景
阿里云消息队列 Kafka 是一款基于 Apache Kafka 的分布式消息中间件,支持消息发布与订阅模型,满足微服务解耦、大数据处理及实时流数据分析需求。其通过存算分离架构优化成本与性能,提供基础版、标准版和专业版三种 Serverless 版本,分别适用于不同业务场景,最高 SLA 达 99.99%。阿里云 Kafka 还具备弹性扩容、多可用区部署、冷热数据缓存隔离等特性,并支持与 Flink、MaxCompute 等生态工具无缝集成,广泛应用于用户行为分析、数据入库等场景,显著提升数据处理效率与实时性。
|
7月前
|
弹性计算 API 持续交付
后端服务架构的微服务化转型
本文旨在探讨后端服务从单体架构向微服务架构转型的过程,分析微服务架构的优势和面临的挑战。文章首先介绍单体架构的局限性,然后详细阐述微服务架构的核心概念及其在现代软件开发中的应用。通过对比两种架构,指出微服务化转型的必要性和实施策略。最后,讨论了微服务架构实施过程中可能遇到的问题及解决方案。
|
8月前
|
Cloud Native Devops 云计算
云计算的未来:云原生架构与微服务的革命####
【10月更文挑战第21天】 随着企业数字化转型的加速,云原生技术正迅速成为IT行业的新宠。本文深入探讨了云原生架构的核心理念、关键技术如容器化和微服务的优势,以及如何通过这些技术实现高效、灵活且可扩展的现代应用开发。我们将揭示云原生如何重塑软件开发流程,提升业务敏捷性,并探索其对企业IT架构的深远影响。 ####
187 3
|
8月前
|
Cloud Native 安全 数据安全/隐私保护
云原生架构下的微服务治理与挑战####
随着云计算技术的飞速发展,云原生架构以其高效、灵活、可扩展的特性成为现代企业IT架构的首选。本文聚焦于云原生环境下的微服务治理问题,探讨其在促进业务敏捷性的同时所面临的挑战及应对策略。通过分析微服务拆分、服务间通信、故障隔离与恢复等关键环节,本文旨在为读者提供一个关于如何在云原生环境中有效实施微服务治理的全面视角,助力企业在数字化转型的道路上稳健前行。 ####
|
3月前
|
Cloud Native Serverless 流计算
云原生时代的应用架构演进:从微服务到 Serverless 的阿里云实践
云原生技术正重塑企业数字化转型路径。阿里云作为亚太领先云服务商,提供完整云原生产品矩阵:容器服务ACK优化启动速度与镜像分发效率;MSE微服务引擎保障高可用性;ASM服务网格降低资源消耗;函数计算FC突破冷启动瓶颈;SAE重新定义PaaS边界;PolarDB数据库实现存储计算分离;DataWorks简化数据湖构建;Flink实时计算助力风控系统。这些技术已在多行业落地,推动效率提升与商业模式创新,助力企业在数字化浪潮中占据先机。
243 12

热门文章

最新文章