MQ四兄弟:如何保证消息可靠性

简介: 本文介绍了RabbitMQ、RocketMQ、Kafka和Pulsar四种消息中间件的可靠性机制。这些中间件通过以下几种方式确保消息的可靠传输:1. 消息持久化,确保消息在重启后不会丢失;2. 确认机制,保证消息从生产者到消费者都被成功处理;3. 重试机制,处理失败后的重试;4. 死信队列,处理无法消费的消息。每种中间件的具体实现略有不同,但核心思想相似,都是从生产者、中间件本身和消费者三个角度来保障消息的可靠性。

添加图片注释,不超过 140 字(可选)


RabbitMQ消息可靠机制

在RabbitMQ中,消息的可靠性传输可以通过以下几种机制来保证:

  1. 持久化:确保消息在RabbitMQ重启后不会丢失。
  2. 确认机制:确保消息从生产者到RabbitMQ以及从RabbitMQ到消费者都被成功处理。
  3. 重试机制:处理消息消费失败后的重试和死信队列(DLX)。

以下是一些关键机制和相应的Java代码片段:

1. 持久化(Durability)

持久化包括将队列和消息设置为持久化,这样在RabbitMQ重启后消息仍然存在。

// 声明持久化队列 boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); String message = "Hello, RabbitMQ!"; // 发送持久化消息 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

2. 确认机制(Acknowledgements)

RabbitMQ提供了手动ACK机制。生产者确认(Publisher Confirms)和消费者确认(Consumer Acknowledgements)机制。

生产者确认

生产者可以开启确认模式,确保消息已被RabbitMQ接收到并持久化。

channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.confirmSelect();//启用了生产者确认模式 String message = "Hello, RabbitMQ!"; channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

消费者确认

消费者可以在处理完消息后发送确认信号,以确保消息被成功处理。

// 处理消息 try {     doWork(message); } finally { System.out.println(" [x] Done");     // 消费者手动确认消息     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; // 自动确认关闭 channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

3. 重试机制和死信队列(DLX)

RabbitMQ支持死信队列(Dead Letter Exchange),可以将处理失败的消息重新路由到另一个死信队列。

try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {     // 声明死信队列     channel.queueDeclare(DLX_NAME, true, false, false, null);     Map<String, Object> args = new HashMap<>();     args.put("x-dead-letter-exchange", "");     // 设置队列的参数,使其使用死信交换器     args.put("x-dead-letter-routing-key", DLX_NAME);     // 声明正常队列,并绑定死信队列     channel.queueDeclare(QUEUE_NAME, true, false, false, args);     String message = "Hello, RabbitMQ!";     channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); } } }

消费者可以通过channel.basicReject或channel.basicNack拒绝消息,并设置requeue参数为false,使消息进入死信队列。

// 拒绝消息并不重新入队 channel.basicReject(deliveryTag, false);

RocketMQ消息可靠机制

在RocketMQ中,消息的可靠性传输可以通过以下几种机制来保证:

  1. 消息持久化:确保消息在Broker重启后不会丢失。
  2. 同步刷盘:确保消息持久化到磁盘后才确认。
  3. 消息确认机制:确保消息被正确处理。
  4. 重试机制和死信队列:处理消息发送失败后的重试和死信队列(DLX)。

以下是一些关键机制和相应的Java代码片段:

1. 消息持久化

RocketMQ默认情况下会持久化消息。 RocketMQ 不需要和rabbitmq一样显式的设置,也就是说,当您发送消息到 RocketMQ 时,消息会自动持久化到磁盘,而无需显式地设置消息为持久化。

2. 消息确认机制

RocketMQ确保消息被消费者正确处理,消费者需要对消息进行确认。

生产者

// 发送消息并同步等待结果                 SendResult sendResult = producer.send(msg);                 // 确认消息是否成功发送                 if (sendResult.getSendStatus() == SendStatus.SEND_OK) {                     System.out.printf("Message sent successfully: %s%n", sendResult);                 } else {                     System.out.printf("Message sending failed: %s%n", sendResult);                 }

消费者

consumer.registerMessageListener(new MessageListenerConcurrently() {     @Override     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {         for (MessageExt msg : msgs) {             System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));             // 处理消息         }         // 返回消息消费状态         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;     } });


3. 消息重试机制

RocketMQ支持消息发送失败后的重试机制,生产者可以配置发送重试次数。

// 设置消息发送失败后的重试次数 producer.setRetryTimesWhenSendFailed(3); // 启动生产者实例 producer.start();

4. 配置同步刷盘模式

在RocketMQ的配置文件中,可以将同步刷盘模式设置为SYNC_MASTER,以确保消息被同步到磁盘后才确认。

brokerRole = SYNC_MASTER flushDiskType = SYNC_FLUSH


Kafka消息可靠机制

在Kafka中,消息的可靠性传输通过以下几种机制来保证:

  1. 消息持久化:消息被写入磁盘,以防止数据丢失。
  2. 复制机制:在Kafka集群中,消息被复制到多个Broker上。
  3. 确认机制:确保消息被正确写入。
  4. 重试机制:处理消息发送失败后的重试。

以下是一些关键机制和相应的Java代码片段:

1. 消息持久化和确认机制

Kafka的生产者可以配置acks参数来确保消息被可靠地写入。常用的acks配置包括:

  • acks=0:生产者不会等待任何Broker的确认。
  • acks=1:生产者会等待主节点(Leader)的确认。
  • acks=all:生产者会等待所有副本节点的确认,确保消息被复制到所有副本。

生产者示例

// 设置生产者配置     Properties props = new Properties();     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());     // 设置acks为all,以确保消息被所有副本确认     props.put(ProducerConfig.ACKS_CONFIG, "all");     // 设置重试次数     props.put(ProducerConfig.RETRIES_CONFIG, 3);     // 设置每次重试的间隔时间     props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);     Producer<String, String> producer = new KafkaProducer<>(props);     try {         for (int i = 0; i < 10; i++) {             String message = "Hello Kafka " + i;             ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", Integer.toString(i), message);             // 发送消息并等待结果             Future<RecordMetadata> future = producer.send(record);             RecordMetadata metadata = future.get();         } }

消费者示例:

// 关闭自动提交偏移量     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");     // 设置自动提交偏移量的间隔时间     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);     consumer.subscribe(Collections.singletonList("my_topic"));     try {         while (true) {             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));             for (ConsumerRecord<String, String> record : records) {                 System.out.printf("Consumed message with offset: %d, partition: %d%n", record.offset(), record.partition());                 // 手动提交偏移量                 consumer.commitSync();             }         }     } catch (Exception e) {         e.printStackTrace();     } finally {         consumer.close();     }

2. 配置消息的重试机制

Kafka生产者通过配置重试机制来处理消息发送失败的情况:

props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);


Pulsar消息可靠机制

Apache Pulsar通过以下几种机制来保证消息的可靠性传输:

  1. 消息持久化:消息被持久化到磁盘,以防止数据丢失。
  2. 复制机制:在Pulsar集群中,消息被复制到多个Broker上。
  3. 确认机制:确保消息被消费者正确处理。
  4. 重试机制:处理消息发送失败后的重试。

以下是一些关键机制和相应的Java代码片段:

1. 消息持久化和复制机制

Pulsar默认情况下会持久化消息并复制到多个Broker上,以确保高可靠性。以下是一个生产者的示例,展示了如何创建一个Pulsar生产者并发送消息。

// 创建生产者,并设置生产者属性     Producer<byte[]> producer = client.newProducer(Schema.BYTES)             .topic("my-topic")             .sendTimeout(0, TimeUnit.SECONDS)  // 发送无超时限制             .producerName("my-producer")             .enableBatching(false)  // 禁用批处理,以减少延迟             .create();     // 发送消息并同步等待结果     String message = "Hello, Pulsar!";     producer.send(message.getBytes());     System.out.println("Message sent successfully");     // 关闭生产者和客户端     producer.close();     client.close(); } }


2. 消费确认机制

Pulsar消费者确保消息被正确处理并确认。以下是一个消费者的示例,展示了如何创建一个Pulsar消费者并处理消息。

// 创建消费者,并设置消费者属性     Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)             .topic("my-topic")             .subscriptionName("my-subscription")             .subscriptionType(SubscriptionType.Exclusive)             .subscribe();     // 消费消息并确认     while (true) {         Message<byte[]> msg = consumer.receive();         try {             System.out.printf("Message received: %s%n", new String(msg.getData()));             // 手动确认消息             consumer.acknowledge(msg);         } catch (Exception e) {             // 处理消息失败,负确认消息             consumer.negativeAcknowledge(msg);         }     } }

3. 配置重试机制

Pulsar的生产者可以配置重试机制来处理消息发送失败的情况。

// 创建生产者实例         Producer<byte[]> producer = client.newProducer()                 .topic("my-topic") // 设置主题                 .sendTimeout(10, java.util.concurrent.TimeUnit.SECONDS) // 设置发送超时时间                 .maxPendingMessages(1000) // 设置最大挂起消息数                 .messageRoutingMode(MessageRoutingMode.SinglePartition) // 设置消息路由模式                 .enableBatching(true) // 启用批处理                 .batchingMaxPublishDelay(1, java.util.concurrent.TimeUnit.MILLISECONDS) // 批处理最大发布延迟                 .blockIfQueueFull(true) // 当消息队列满时阻塞                 .retryLetterTopic("my-topic-retry") // 设置重试主题                 .deadLetterTopic("my-topic-dlq") // 设置死信队列主题                 .enableRetry(true) // 启用重试机制                 .initialSequenceId(0) // 设置初始序列ID                 .create();


总结


添加图片注释,不超过 140 字(可选)


可以发现消息中间件如 RabbitMQ、RocketMQ、Kafka 和 Pulsar 在保证消息可靠性方面都差不多,都是从消息的生产者、MQ本身、消费者三个方向来保证的。

  • 消息持久化机制
  • 消息确认机制
  • 重试机制
  • 死信队列
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
打赏
0
0
0
0
73
分享
相关文章
RabbitMQ如何保证消息可靠性?
RabbitMQ通过多种机制确保消息的可靠性,包括消息持久化、确认机制、消息重试与死信队列、消息去重、高可用性配置以及监控与告警机制。这些措施共同构成了RabbitMQ可靠消息传递的基础,帮助开发者在构建分布式系统时有效避免消息丢失和重复处理问题。理解并正确实施这些技术,将显著提高应用系统的稳定性和用户体验。
123 14
RabbitMQ-消息消费时的可靠性保障
将这些实践融入到消息消费的处理逻辑中,可以很大程度上保障RabbitMQ中消息消费的可靠性,确保消息系统的稳定性和数据的一致性。这些措施的实施,需要在系统的设计和开发阶段充分考虑,以及在后续的维护过程中不断的调整和完善。
89 0
深入理解MQ消息队列的高可用与可靠性策略
深入理解MQ消息队列的高可用与可靠性策略
1418 3
SpringBoot基于RabbitMQ实现消息可靠性
SpringBoot基于RabbitMQ实现消息可靠性
153 0
【微服务系列笔记】MQ消息可靠性
消息可靠性涉及防止丢失,包括生产者发送时丢失、未到达队列以及消费者消费失败处理后丢失。 确保RabbitMQ消息可靠性的方法有:开启生产者确认机制,确保消息到达队列;启用消息持久化以防止未消费时丢失;使用消费者确认机制,如设置为auto,由Spring确认处理成功后ack。此外,可开启消费者失败重试机制,多次失败后将消息投递到异常交换机。
181 1
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消费者确认机制、失败重试机制、失败处理策略、业务幂等性等内容。
351 0
RabbitMQ入门指南(九):消费者可靠性
RabbitMQ入门指南(八):MQ可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了MQ数据持久化、LazyQueue模式、管理控制台配置Lazy模式、代码配置Lazy模式、更新已有队列为lazy模式等内容。
622 0
RabbitMQ入门指南(七):生产者可靠性
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了消息丢失的可能性、生产者可靠性中的生产者重试机制和生产者确认机制等内容。
297 0
RabbitMQ入门指南(七):生产者可靠性