MQ四兄弟:如何保证消息顺序性

简介: 在分布式系统中,消息队列(MQ)是确保组件间高效通信的关键。RabbitMQ、RocketMQ、Kafka和Pulsar通过不同机制保证消息顺序性:RabbitMQ依赖单一队列和消费者模式;RocketMQ使用MessageQueueSelector;Kafka基于Partition和Key;Pulsar通过分区主题和键路由。这些系统的核心思想是将相同特征的消息发送到同一队列或分区,并按先进先出原则消费,从而确保消息顺序性。

在当今的分布式系统架构中,消息队列(MQ)是不可或缺的组成部分。它们在确保系统组件之间高效通信方面发挥着关键作用。特别是在金融交易、物流跟踪等对消息处理顺序有严格要求的场景中,消息队列的顺序性保证显得更为重要。接下来,我们将深入探讨RabbitMQ、RocketMQ、Kafka和Pulsar这四个广泛使用的消息队列系统,分析它们是如何确保消息的顺序性,并附上相应的代码示例。


添加图片注释,不超过 140 字(可选)


RabbitMQ

RabbitMQ作为一款成熟的开源消息队列,,基于AMQP(Advanced Message Queuing Protocol)协议构建,广泛应用于企业级应用中。虽然RabbitMQ本身并不保证严格的全局顺序性,但可以通过特定的设计模式来实现消息顺序性。

  1. 单一队列和单一消费者模式:确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。因为队列本身就是一个先进先出的结构。
  2. 消息排序:在消息生产者端,为消息添加序列号或时间戳,消费者端根据这些信息对消息进行排序。

以下是一个简单的Java代码片段,展示了如何在RabbitMQ中发送消息。请注意,这个例子没有包含消息排序的逻辑,因为它依赖于具体的业务场景和消息结构。

public class Send {     private final static String QUEUE_NAME = "hello";     public static void main(String[] argv) throws Exception {         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("localhost");         try (Connection connection = factory.newConnection();              Channel channel = connection.createChannel()) {             channel.queueDeclare(QUEUE_NAME, false, false, false, null);             String message = "Hello World!";             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());             System.out.println(" [x] Sent '" + message + "'");         }     } }

创建了一个连接和一个通道,然后声明了一个队列。之后,我们发布了一个简单的消息到队列中。为了保证消息的顺序性,我们需要确保所有消息都是通过同一个通道发送,并且在消费端也是由同一个消费者按顺序接收处理。

RocketMQ

RocketMQ作为阿里巴巴开源的分布式消息队列,在保证消息顺序性方面提供了一种基于MessageQueueSelector的解决方案。其核心思路是将有序的消息写入特定的队列,从而使消费端固定消费某个队列时,就能够按顺序消费消息。

具体来说,RocketMQ中有两个重要概念:

  • Topic: 逻辑上的消息主题
  • MessageQueue: 物理上存储消息的队列

一个Topic包含多个MessageQueue,消息会根据其内容进行哈希计算,分配到不同的MessageQueue中。用户可以通过提供MessageQueueSelector,对特定类型的消息强制分配到同一个MessageQueue,从而保证顺序性。

示例代码:

生产者

// 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("nameserver:9876"); // 启动Producer实例 producer.start(); // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest", "TagA", "OrderID" + orderId, ("Hello RocketMQ " + i).getBytes()); // 发送有序消息 producer.send(msg, new MessageQueueSelector() {     @Override     public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {         Integer orderId = (Integer) arg; // 订单ID作为选择器的参数         int index = orderId % mqs.size(); // 根据订单ID计算MessageQueue索引         return mqs.get(index); // 返回该索引对应的MessageQueue     } }, orderId);

通过上述代码,发送端可以将具有相同订单号的消息发送到同一个MessageQueue。

消费端

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");     consumer.setNamesrvAddr("nameserver:9876");     consumer.subscribe("TopicTest", "TagA");     consumer.registerMessageListener(new MessageListenerOrderly() {         @Override         public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {             context.setAutoCommit(true);             for (MessageExt msg : msgs) {                 System.out.printf("Consumer: %s %n", new String(msg.getBody()));             }             return ConsumeOrderlyStatus.SUCCESS;         }     });

消费端只需固定消费指定的MessageQueue,即可以保证消息按顺序被消费。


Kafka

Kafka通过Partition(分区)的概念来保证消息的顺序性。同一个Partition中的消息是有序的,但不同Partition之间是无序的。Producer在发送消息时可以指定消息要发送到的分区。Kafka默认提供了基于key的分区策略,确保具有相同key的消息会被发送到同一个分区,从而保证这些消息在这个分区内的顺序性。

以下是一个简单的 Java 代码示例,展示了如何在 Kafka 中发送和消费有序消息:

生产者代码

public class OrderProducer {     public static void main(String[] args) {         Properties props = new Properties();         props.put("bootstrap.servers", "localhost:9092");         KafkaProducer<String, String> producer = new KafkaProducer<>(props);         // 假设我们有10个订单,每个订单的消息需要顺序处理         for (int orderId = 0; orderId < 10; orderId++) {             for (int i = 0; i < 5; i++) { // 每个订单发送5条消息                 String message = String.format("Order %d, Message %d", orderId, i);                 producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));             }         }         producer.close();     } }

producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));

第二个参数是消息的键(key),这里使用订单ID作为键,确保相同订单ID的消息发送到同一个分区

消费者代码

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "order_consumer_group"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("OrderTopic"));

在生产者代码中,我们使用了相同的 key(即订单ID)来确保消息被发送到同一个 Partition。在消费者代码中,我们订阅了整个 Topic,但由于我们使用了相同的 key 来发送消息,Kafka 会自动将具有相同 key 的消息路由到同一个 Partition,从而保证顺序性。

Pulsar

Apache Pulsar 通过分区主题(Partitioned Topics)来保证消息的顺序性。在Pulsar中,每个分区可以看作是一个独立的消息队列,分区内的消息保持发送顺序。为了确保消息的顺序性,生产者在发送消息时需要指定一个键(Key),Pulsar会根据这个键将消息路由到特定的分区。这样,具有相同键的消息就会被发送到同一个分区,并且按照发送的顺序进行消费。

生产者代码示例:

public class PulsarOrderProducer {     public static void main(String[] args) throws Exception {         PulsarClient client = PulsarClient.builder()                 .serviceUrl("pulsar://localhost:6650")                 .build();         Producer<String> producer = client.newProducer(Schema.STRING)                 .topic("persistent://public/default/my-topic")                 .create();         for (int i = 0; i < 100; i++) {             String key = "OrderID" + (i % 10); // 假设OrderID是业务键             String value = "Message" + i;             producer.newMessage()                     .key(key)                     .value(value)                     .send();         }         producer.close();         client.close();     } }

消费者代码示例:

public class PulsarOrderConsumer {     public static void main(String[] args) throws Exception {         PulsarClient client = PulsarClient.builder()                 .serviceUrl("pulsar://localhost:6650")                 .build();         Consumer<String> consumer = client.newConsumer(Schema.STRING)                 .topic("persistent://public/default/my-topic")                 .subscriptionName("my-subscription")                 .subscriptionType(SubscriptionType.Exclusive)                 .subscribe();         while (true) {             Message<String> msg = consumer.receive();             try {                 // 处理消息                 System.out.printf("Message with key %s: %s", msg.getKey(), msg.getValue());                 consumer.acknowledge(msg);             } catch (Exception e) {                 consumer.negativeAcknowledge(msg);             }         }     } }

在消费者代码中,我们使用了SubscriptionType.Exclusive,使订阅被独占,确保只有一个消费者能够消费分区内的消息,从而保证了消息的顺序性。

总结


添加图片注释,不超过 140 字(可选)


尽管RabbitMQ、RocketMQ、Kafka和Pulsar这些消息队列系统虽然在实现细节上有所不同,但它们保证消息顺序性的核心思想都是相似的,即确保具有相同特征的消息被发送到同一队列或分区中,由于队列数据结构本身就是先进先出的结构,因此只需要消费者从该队列按顺序消费,就能够保证消息的有序性。


相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
存储 算法 Oracle
极致八股文之JVM垃圾回收器G1&ZGC详解
本文作者分享了一些垃圾回收器的执行过程,希望给大家参考。
|
消息中间件 存储 运维
Rabbitmq消息大量堆积怎么办?
该文讨论了一个系统架构问题,主要涉及RabbitMQ在处理订单消息时遇到的性能瓶颈。首先,系统使用RabbitMQ是为了解耦和提高性能,前端创建订单后通过RabbitMQ发送消息给订单履约系统消费并执行后续操作。当订单流量激增时,消息堆积导致服务器压力增加。 排查解决方案: 1. 增加消费者以提高消费速度,但发现即使增加消费者,消息堆积问题仍未解决。 2. 分析消费者逻辑,发现调用库存系统接口可能导致处理速度慢。库存系统压力大,接口响应慢,加剧问题。 3. 实施清空堆积消息的策略,新建消费者快速消费消息并存储在表中,减轻服务器压力。待库存服务恢复后,再将消息推回RabbitMQ处理。
1124 1
|
6月前
|
消息中间件 负载均衡
RabbitMQ的工作模型?
RabbitMQ 核心模型包括交换机、队列和绑定,支持五种消息模式:简单队列、工作队列、发布/订阅、路由和主题模式,适用于不同场景的消息通信与分发。
1165 0
|
消息中间件 存储 RocketMQ
Rocketmq如何保证消息不丢失
文章分析了RocketMQ如何通过生产者端的同步发送与重试机制、Broker端的持久化存储与消息重试投递策略、以及消费者端的手动提交ack与幂等性处理,来确保消息在整个传输和消费过程中的不丢失。
|
消息中间件 RocketMQ
如何保证RocketMQ消息有序?
如何保证RocketMQ消息有序?
|
11月前
|
消息中间件
使用RabbitMQ如何保证消息不丢失 ?
消息从发送,到消费者接收,会经理多个过程 , 其中的每一步都可能导致消息丢失 针对这些问题,RabbitMQ分别给出了解决方案: ● 消息发送到交换机丢失 : 发布者确认机制publisher-confirm消息发送到交换机失败会向生产者返回ACK , 生产者通过回调接收发送结果 , 如果发送失败, 重新发送, 或者记录日志人工介入 ● 消息从交换机路由到队列丢失 : 发布者回执机制publisher-return消息从交换机路由到队列失败会向生产者返回失败原因 , 生产者通过回调接收回调结果 , 如果发送失败, 重新发送, 或者记录日志人工介入 ● 消息保存到队列中丢失 : MQ持久化(交
|
消息中间件 存储 运维
2024最全RabbitMQ集群方案汇总
本文梳理了RabbitMQ集群的几种方案,主要包括普通集群、镜像集群(高可用)、Quorum队列(仲裁队列)、Streams集群模式(高可用+负载均衡)和插件方式。重点介绍了每种方案的特点、优缺点及适用场景。搭建步骤包括安装Erlang和RabbitMQ、配置集群节点、修改hosts文件、配置Erlang Cookie、启动独立节点并创建集群,以及配置镜像队列以提高可用性和容错性。推荐使用Quorum队列与Streams模式,其中Quorum队列适合高可用集群,Streams模式则同时支持高可用和负载均衡。此外,还有Shovel和Federation插件可用于特定场景下的集群搭建。
3105 2
|
消息中间件 存储 负载均衡
2024消息队列“四大天王”:Rabbit、Rocket、Kafka、Pulsar巅峰对决
本文对比了 RabbitMQ、RocketMQ、Kafka 和 Pulsar 四种消息队列系统,涵盖架构、性能、可用性和适用场景。RabbitMQ 以灵活路由和可靠性著称;RocketMQ 支持高可用和顺序消息;Kafka 专为高吞吐量和低延迟设计;Pulsar 提供多租户支持和高可扩展性。性能方面,吞吐量从高到低依次为
5362 1
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
1055 2
|
消息中间件 存储 监控
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别,设计目标、适用场景、吞吐量、消息存储和持久化、可靠性、集群负载均衡
RabbitMQ、Kafka对比(超详细),Kafka、RabbitMQ、RocketMQ的区别