SpringBoot整合RocketMQ发送顺序消息

简介: 严格按照消息的发送顺序进行消费的消息。默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列,而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的,如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性

1. 有序性分类
根据有序范围的不同,可以分为两种消息的有序性:分区有序和全局有序
分区有序
有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,称为分区有序
在定义Producer时我们可以指定消息队列选择器,而这个选择器是我们自己实现了MessageQueueSelector接口定义的。在定义选择器的选择算法时,一般需要使用选择key。这个选择key可以是消息key也可以是其它数据。但无论谁做选择key,都不能重复,都是唯一的。

一般性的选择算法是,让选择key(或其hash值)与该Topic所包含的Queue的数量取模,其结果即为选择出的Queue的QueueId。

取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。若是当前Consumer需要消费的消息,则直接消费,否则,什么也不做。这种做法要求选择key要能够随着消息一起被Consumer获取到。此时使用消息key作为选择key是比较好的做法。
全局有序
当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序,称为全局有序
在创建Topic时指定Queue的数量。有三种指定方式:

在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
使用mqadmin命令手动创建Topic时指定Queue数量
2. 生产者业务接口

public interface OrderMessageService {

    /**
     * 发送同步顺序消息
     * @param id
     * @param message
     */
    void sendSyncOrderMessage(String id, String message);

    /**
     * 发送异步顺序消息
     * @param id
     * @param message
     */
    void sendAsyncOrderMessage(String id, String message);

    /**
     * 发送单向顺序消息
     * @param id
     * @param message
     */
    void sendOnewayOrderMessage(String id, String message);
}

3. 生产者业务接口实现类


@Service
public class OrderMessageServiceImpl implements OrderMessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    private static final Logger logger = LoggerFactory.getLogger(OrderMessageServiceImpl.class);

    @Override
    public void sendSyncOrderMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object obj) {
                Integer uid = Integer.valueOf(String.valueOf(obj));
                int index = uid % list.size();
                return list.get(index);
            }
        });
        SendResult result = rocketMQTemplate.syncSendOrderly("order-message-topic:sync-tags", strMessage, id);
        if (result.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("发送同步顺序消息成功!");
        } else {
            logger.error("发送同步顺序消息失败!消息ID为:{}", result.getMsgId());
        }
    }

    @Override
    public void sendAsyncOrderMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object obj) {
                Integer uid = (Integer) obj;
                int index = uid % list.size();
                return list.get(index);
            }
        });
        rocketMQTemplate.asyncSendOrderly("order-message-topic:async-tags", strMessage, id, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
                    logger.info("发送异步顺序消息成功!消息ID为:{}", sendResult.getMsgId());
                }
            }
            @Override
            public void onException(Throwable throwable) {
                logger.info("发送异步顺序消息失败!失败原因为:{}", throwable.getMessage());
            }
        });
    }

    @Override
    public void sendOnewayOrderMessage(String id, String message) {
        Message<String> strMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, id).build();
        rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object obj) {
                Integer uid = (Integer) obj;
                int index = uid % list.size();
                return list.get(index);
            }
        });
        rocketMQTemplate.sendOneWayOrderly("order-message-topic:oneway-tags", strMessage, id);
    }
}

4. 消费者类

@Component
@RocketMQMessageListener(topic = "order-message-topic", consumerGroup = "order-consumer-group", consumeMode = ConsumeMode.ORDERLY)
public class OrderMessageListener implements RocketMQListener<String> {

    private static final Logger logger = LoggerFactory.getLogger(OrderMessageListener.class);

    @Override
    public void onMessage(String message) {
        logger.info("接收到顺序消息为:{}", message);
    }
}

5. 测试

@Test
void orderMessage() {
    for (int i = 1; i < 5; i++) {
        orderMessageService.sendSyncOrderMessage(String.valueOf(i), "hello" + i);
    }
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
1087 1
|
5月前
|
监控 安全 Java
Java 开发中基于 Spring Boot 3.2 框架集成 MQTT 5.0 协议实现消息推送与订阅功能的技术方案解析
本文介绍基于Spring Boot 3.2集成MQTT 5.0的消息推送与订阅技术方案,涵盖核心技术栈选型(Spring Boot、Eclipse Paho、HiveMQ)、项目搭建与配置、消息发布与订阅服务实现,以及在智能家居控制系统中的应用实例。同时,详细探讨了安全增强(TLS/SSL)、性能优化(异步处理与背压控制)、测试监控及生产环境部署方案,为构建高可用、高性能的消息通信系统提供全面指导。附资源下载链接:[https://pan.quark.cn/s/14fcf913bae6](https://pan.quark.cn/s/14fcf913bae6)。
913 0
|
11月前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
260 6
|
消息中间件 Java 测试技术
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
这篇文章是关于如何在SpringBoot应用中整合RabbitMQ的消息中间件。内容包括了在SpringBoot项目中添加RabbitMQ的依赖、配置文件设置、启动类注解,以及如何通过单元测试来创建交换器、队列、绑定,并发送和接收消息。文章还介绍了如何配置消息转换器以支持对象的序列化和反序列化,以及如何使用注解`@RabbitListener`来接收消息。
消息中间件RabbitMQ---SpringBoot整合RabbitMQ【三】
|
消息中间件 Java Maven
|
网络协议 Java 物联网
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
MQTT(EMQX) - SpringBoot 整合MQTT 连接池 Demo - 附源代码 + 在线客服聊天架构图
2346 3
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
1065 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
68042 2
3 张图带你彻底理解 RocketMQ 事务消息

热门文章

最新文章