我们来说一下消息的可靠性投递

简介: 我是小假 期待与你的下一次相遇 ~

1. 核心概念

可靠性投递(Reliable Delivery)是指确保消息从生产者成功到达消费者,即使面对网络故障、系统崩溃等异常情况也能保证不丢失、不重复、按顺序(部分场景)传递。

2. 面临的挑战

  • 网络不可靠:丢包、延迟、分区
  • 节点故障:生产者/消费者/中间件宕机
  • 重复消费:确认机制可能引发重复
  • 顺序保证:分布式环境下消息乱序

3. 关键实现机制

3.1 生产端保证

// 伪代码示例:生产端确认模式
public void sendWithConfirm(Message msg) {
    // 1. 持久化到本地数据库(防丢失)
    messageDao.save(msg);
    
    // 2. 发送到消息队列
    String msgId = rabbitTemplate.convertAndSend(msg);
    
    // 3. 等待Broker确认
    boolean ack = waitForAck(msgId, TIMEOUT);
    
    // 4. 失败重试(指数退避)
    if (!ack) {
        retryWithBackoff(msg);
    }
    
    // 5. 最终记录投递状态
    updateDeliveryStatus(msgId, ack);
}

image.gif

技术要点

  • 事务机制:同步方式,性能差(不推荐)
  • 确认机制(Confirm)
  • 普通确认(每消息确认)
  • 批量确认(提高吞吐)
  • 异步监听(最佳实践)
  • 本地消息表:事务消息的替代方案
  • 消息持久化:设置delivery_mode=2

3.2 Broker端保证

消息处理流程:
Producer → Broker接收 → 持久化存储 → 推送给Consumer → 等待ACK → 删除/重投

image.gif

持久化策略

  • 队列持久化durable=true
  • 消息持久化delivery_mode=2
  • 镜像队列:多副本冗余(RabbitMQ)
  • 高可用集群:主从切换时不丢消息

3.3 消费端保证

// 消费端保证示例
@RabbitListener(queues = "order.queue")
public void handleOrder(OrderMessage order, Channel channel, 
                       @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    try {
        // 1. 业务处理
        orderService.process(order);
        
        // 2. 手动确认(成功才ACK)
        channel.basicAck(tag, false);
        
        // 3. 更新消费记录
        consumeRecordService.markConsumed(order.getId());
        
    } catch (Exception e) {
        // 4. 失败处理:重试或进入死信队列
        if (retryCount < MAX_RETRY) {
            channel.basicNack(tag, false, true); // 重入队列
        } else {
            channel.basicNack(tag, false, false); // 进入死信队列
            alarmService.notifyAdmin(order, e);
        }
    }
}

image.gif

消费端关键点

  • 手动ACK:避免自动确认导致消息丢失
  • 幂等性设计
public boolean processWithIdempotent(String msgId) {
    // 基于消息ID去重
    if (redis.exists("processed:" + msgId)) {
        return true; // 已处理过
    }
    
    // 业务处理
    boolean success = doBusinessLogic();
    
    // 记录处理状态
    if (success) {
        redis.setex("processed:" + msgId, 24h, "1");
    }
    
    return success;
}

image.gif

  • 死信队列(DLQ):处理无法消费的消息
  • 消费重试策略
  • 立即重试(瞬时故障)
  • 延迟重试(业务依赖)
  • 指数退避(防止雪崩)

4. 完整可靠性方案

4.1 事务消息方案(如RocketMQ)

两阶段提交:
1. 发送Half Message(预备消息)
2. 执行本地事务
3. 根据本地事务结果Commit/Rollback
4. Broker检查事务状态并投递/丢弃

image.gif

4.2 最大努力投递方案

# 补偿机制实现
def reliable_delivery(message):
    max_retries = 5
    for attempt in range(max_retries):
        try:
            # 尝试投递
            result = mq_client.send(message)
            
            if result.confirmed:
                log_delivery_success(message.id)
                return True
                
        except Exception as e:
            log_failure(attempt, e)
            
            if attempt == max_retries - 1:
                # 最终失败,人工介入
                send_alert_to_admin(message)
                save_to_compensation_table(message)
                return False
                
            # 等待后重试
            sleep(backoff_time(attempt))
    
    return False

image.gif

4.3 本地消息表方案(经典)

-- 本地消息表结构
CREATE TABLE local_message (
    id BIGINT PRIMARY KEY,
    biz_id VARCHAR(64),      -- 业务ID
    content TEXT,           -- 消息内容
    status TINYINT,         -- 0:待发送, 1:已发送, 2:已确认
    retry_count INT,
    next_retry_time DATETIME,
    created_at TIMESTAMP
);

image.gif

工作流程

  1. 业务数据+消息记录原子性写入本地DB
  2. 定时任务扫描待发送消息
  3. 调用MQ发送,成功后更新状态
  4. 消费者处理完成后反向确认
  5. 对账程序定期校验数据一致性

5. 高级特性与优化

5.1 顺序性保证

  • 全局有序:单队列单消费者(性能低)
  • 局部有序:相同sharding key的消息发到同一队列
  • 牺牲场景:重试队列可能破坏顺序

5.2 批量消息可靠性

// 批量消息的可靠性处理
public class BatchMessageReliableSender {
    
    public void sendBatch(List<Message> batch) {
        // 1. 批量持久化到本地
        batchMessageDao.saveAll(batch);
        
        // 2. 设置批次ID
        String batchId = generateBatchId();
        
        // 3. 发送批次消息
        boolean success = mqTemplate.sendBatch(batchId, batch);
        
        // 4. 批次确认(或单条补偿)
        if (success) {
            markBatchDelivered(batchId);
        } else {
            // 逐条重试或记录异常
            compensateFailedMessages(batch);
        }
    }
}

image.gif

5.3 监控与对账

  • 实时监控
  • 堆积情况监控
  • 消费延迟报警
  • 失败率统计
  • 定期对账:
-- 消息对账SQL示例
SELECT 
  DATE(create_time) as day,
  COUNT(*) as total_sent,
  SUM(CASE WHEN status=2 THEN 1 ELSE 0 END) as confirmed,
  SUM(CASE WHEN status=1 THEN 1 ELSE 0 END) as pending
FROM message_record
GROUP BY DATE(create_time)
HAVING total_sent != confirmed;

image.gif

6. 不同MQ的实现差异

特性 RabbitMQ Kafka RocketMQ
可靠性机制 确认+持久化+镜像队列 副本机制+ACK+Exactly-Once 事务消息+本地存储
顺序性 单队列保证 Partition内有序 Queue内有序
事务支持 轻量级事务(性能差) 支持Exactly-Once语义 完整事务消息
最佳适用场景 业务消息、高可靠要求 日志流、大数据场景 金融交易、订单业务

7. 实践建议

  1. 分级可靠性策略
  • 关键业务:事务消息+本地表+对账
  • 普通业务:确认机制+重试+死信队列
  • 日志类:最多一次投递即可
  1. 性能与可靠性的平衡
  • 同步刷盘 vs 异步刷盘
  • 同步复制 vs 异步复制
  • 根据业务重要性选择配置
  1. 灾难恢复设计:
# 配置示例:多级降级
mq:
  primary:
    url: "amqp://primary"
    timeout: 1000ms
  secondary:
    url: "amqp://secondary"
    timeout: 2000ms
  fallback-to-db: true  # 最终降级到数据库

image.gif

总结

消息的可靠性投递是一个系统工程,需要在生产端、Broker端、消费端协同设计,结合业务场景、性能要求、成本约束做出合适的选择。没有"银弹"方案,只有最适合的方案。建议从简单方案开始,随着业务复杂度增加逐步引入更完善的可靠性机制。

面试回答

首先,消息可靠性投递指的是:

一个消息从发送到被消费者成功处理,过程中不会丢失或重复,保证最终数据的一致性。在实际系统里,消息可能因为网络问题、服务重启等原因丢失或重复,所以我们需要一套机制来确保可靠。

为什么需要它呢?

比如在订单系统中,用户支付成功后要通知物流系统,如果消息丢了,物流就不会触发,用户体验就受损;如果消息重复,可能重复发货,造成损失。所以像金融、交易这些场景,可靠性特别重要。

常见的实现方式,我了解的有几种:

  1. 生产者确认机制
    生产者发消息后,MQ(比如RabbitMQ)会返回一个确认(ACK),如果没收到ACK,生产者可以重发。这样可以防止消息在发送阶段丢失。
  2. 消息持久化
    消息保存到磁盘,而不是只放在内存。这样即使MQ重启,消息也不会丢。
  3. 消费者手动ACK
    消费者处理完消息后,手动告诉MQ“我已经处理完了”,MQ才删除消息;如果处理失败,MQ可以把消息重新投递给其他消费者。避免消息在处理阶段丢失。
  4. 事务消息(比如RocketMQ)
    先发一个“半消息”,等本地事务执行成功,再确认投递;如果失败,就回滚。这适用于分布式事务场景。
  5. 消息去重
    为了避免重复消费,可以在消费端做幂等性设计。比如在数据库里记录消息ID,每次处理前先查一下是否已经处理过。

实际中我们一般会结合业务来设计。

比如一个订单状态同步的场景,我可能会用:生产者确认 + 消息持久化 + 消费者手动ACK + 消费端幂等性。这样基本能覆盖发送、存储、消费各个环节的可靠性。

当然,可靠性和性能之间需要权衡,比如持久化会降低吞吐量,手动ACK会增加延迟。所以要根据业务需求来选择合适的方案。

追加:遇到过消息丢失或重复的问题,你是怎么排查和解决的?

追加:是否了解最终一致性、最大努力通知等模式 ?

相关文章
|
1月前
|
存储 自然语言处理 测试技术
一行代码,让 Elasticsearch 集群瞬间雪崩——5000W 数据压测下的性能避坑全攻略
本文深入剖析 Elasticsearch 中模糊查询的三大陷阱及性能优化方案。通过5000 万级数据量下做了高压测试,用真实数据复刻事故现场,助力开发者规避“查询雪崩”,为您的业务保驾护航。
1367 89
|
19天前
|
安全 Java 编译器
我们来说一下 synchronized 与 ReentrantLock 的区别
我是小假 期待与你的下一次相遇 ~
107 4
|
7月前
|
存储 Java Linux
详细地说一说零拷贝
我是小假 期待与你的下一次相遇 ~
447 1
详细地说一说零拷贝
|
19天前
|
人工智能 前端开发 Unix
从CLI原理出发,如何做好AI Coding
本文探讨CLI类AI编程工具的产品美学与技术原理,分析其遵循Unix哲学的轻量、可组合、可集成特性,解析Single Agent架构与上下文工程的实践,并分享如何通过Prompt优化、任务拆解与团队对齐,高效利用CLI提升编码效率,展望AI时代人机协作的新范式。
242 10
从CLI原理出发,如何做好AI Coding
|
19天前
|
人工智能 自然语言处理 运维
2025揭秘:7大Agent赛道,哪些值得企业重点布局?
在AI深度融入的今天,Agent已从概念走向广泛应用。具备自主决策、任务拆解与工具协同能力的智能体,正重塑工作与生活。2025年全球85%组织已部署Agent,市场规模达73.8亿美元。本文盘点企业通用、客服、医疗、工业、个人助理、教育科研及金融七大类Agent,解析其如何成为数字化转型核心引擎,释放人类创造力。
|
29天前
|
人工智能 开发者
阿里巴巴AI Coding 分享会—— Qoder Together 上海站来啦!
Qoder Together,不止技术分享,更是思维共振与灵感迸发。我们面向全球AI Coding爱好者,邀请Qoder团队、实战用户、AI Coding探索者齐聚一堂,交流激发创意,碰撞拓展边界,重新定义智能编程未来。
117 1
|
12天前
|
机器学习/深度学习 人工智能 自然语言处理
AI切文章就像切西瓜:递归字符分割让机器懂你心
你有没有试过给ChatGPT发一篇超长文章,结果它说'太长了,看不完'?就像让人一口吃下整个西瓜一样不现实!递归字符分割技术就像一个贴心的切瓜师傅,知道在哪里下刀才不会破坏瓜的甜美。掌握这项技术,让你的AI应用从'消化不良'变成'营养吸收专家'。#人工智能 #文本处理 #自然语言处理 #机器学习
119 4
|
19天前
|
传感器 人工智能 BI
2025科普手册:6大智能体类型,拆解智能重塑世界的底层逻辑
智能体正从概念走向现实,2025年被誉为“智能体元年”。全球市场规模达113亿美元,中国突破69亿元。本文盘点六大主流类型:反应式、目标驱动型、认知增强型、语音、编码及多智能体协同系统,解析其核心能力与应用场景,揭示智能体如何重塑工作与生活,推动人机协作迈向新阶段。
|
23天前
|
人工智能 搜索推荐 机器人
2025年AI智能体来了,企业却还在试水池里扑腾!
88%企业都说用AI了,但大部分还在试点阶段扑腾?AI智能体听起来很酷,实际落地却像让ChatGPT去当总经理。揭秘为什么高效企业用AI搞创新,而不是只盯着省钱。从试点到规模化,这道坎比想象中难跨! #人工智能 #AI智能体 #企业数字化 #创新管理
144 3
|
21天前
|
机器学习/深度学习 人工智能 前端开发
AI大模型爆火的SSE技术到底是什么?万字长文,一篇读懂SSE!
本文从SSE(Server-Sent Events)技术的原理到示例代码,为你通俗易懂的讲解SSE技术的方方面面。
165 1