问题描述
消息队列的消息幂等性,主要是由 MQ 重试机制引起的。
我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。
举个例子,一个消息 M 发送到了消息中间件,消息投递到了消费程序 A,A 接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。然而这种可靠的特性导致,消息可能被多次地投递。
再举个例子,程序 A 接受到这个消息 M 并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序 A 来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。
解决方案
基于业务表的去重
假如我们的逻辑是这样的:
select * from t_order where order_no = 'order123' if(order != null) { return ;//消息重复,直接返回 }
那么在并下情况下可能会有问题(当业务还没处理完,此时并发的请求进来了,则会穿透掉检查的挡板),所以要考虑原子性问题。
解决的办法在我们上一篇文章中也有介绍到,即使用:
- 悲观锁(
SELECT ... FOR UPDATE
) - 乐观锁
但无论是 select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。
基于消息表+本地事务
我们以 RocketMQ 作为消息中间件为例, RocketMQ 的 文档 中描述道:
“RocketMQ
无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是 msgId,也可以是消息内容中的唯一标识字段,例如订单 Id 等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)”
“msgId 一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同 msgId 的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。”
根据文档的描述我们得到了第一种解决方案,即通过去重表
的方法,这点在上一篇文章 《幂等解决方案集合(一)》 中已有所描述,需要注意的是:
“在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。”
具体来说,我们可以这样做:在数据库中增加一个消息消费记录表,把业务操作和这个消息插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。
- 开启事务
- 插入消息表(处理好主键冲突的问题)
- 更新订单表(原消费逻辑)
- 提交事务
但是这里有它的局限性
- 消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如 Redis 这种不支持事务特性的数据源,则这些数据是不可回滚的。
- 数据库的数据必须是在一个库,跨库无法解决
使用 Exactly-Once 投递语义收发消息
Exactly-Once 投递语义
以下引用自阿里云的文档:
“Exactly-Once
是指发送到消息系统的消息只能被消费端处理且仅处理一次,即使生产端重试消息发送导致某消息重复投递,该消息在消费端也只被消费一次。”
“Exactly-Once
语义是消息系统和流式计算系统中消息流转的最理想状态,但是在业界并没有太多理想的实现。因为真正意义上的 Exactly-Once 依赖消息系统的服务端、消息系统的客户端和用户消费逻辑这三者状态的协调。例如,当您的消费端完成一条消息的消费处理后出现异常宕机,而消费端重启后由于消费的位点没有同步到消息系统的服务端,该消息有可能被重复消费。”
“业界对于Exactly-Once
投递语义存在很大的争议,很多人会拿出“FLP 不可能理论”或者其他一致性定律对此议题进行否定,但事实上,特定场景的 Exactly-Once 语义实现并不是非常复杂,只是因为通常大家没有精确的描述问题的本质。”
“如果您要实现一条消息的消费结果只能在业务系统中生效一次,您需要解决的只是如何保证同一条消息的消费幂等问题。消息队列RocketMQ
版的Exactly-Once
语义就是解决业务中最常见的一条消息的消费结果(消息在消费端计算处理的结果)在数据库系统中有且仅生效一次的问题。”
典型使用场景
“在电商系统中,上游实时计算模块发布商品价格变更的信息,异步通知到下游商品管理模块进行价格变更。此时,需要保证每一条信息的消费幂等,即重复的价格变更信息只会生效一次,这样便不会发生价格多次重复修改的情况,确保实现了消息消费的幂等。”
操作步骤
1 添加依赖
消息队列 RocketMQ 版的 ExactlyOnceConsumer 在客户端 SDK ons-client-ext-1.8.4.Final 中发布,若要使用 Exactly-Once 投递语义,需在应用中依赖该 SDK。另外,ExactlyOnceConsumer 基于 Spring 实现了通过注解@MQTransaction 开启 Exactly-Once 消费的方式,因此还需要在应用中增加 Spring 3.0 以上版本的依赖。
完整的依赖内容如下所示。
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client-ext</artifactId> <version>1.8.4.Final</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring-version}</version> </dependency>
2 在用于存储消息消费结果的数据库中创建 transaction_record 表 (注意:存储消息消费结果的数据库系统必须支持本地事务)
若要使用消息队列 RocketMQ 版的 Exactly-Once 投递语义,您需要在业务处理结果持久化的数据库中创建一张 transaction_record 表,保证此表与存储业务处理结果的表在同一个数据库内,且该数据库支持本地事务。目前,消息队列 RocketMQ 版的 Exactly-Once 投递语义支持您的业务访问 MySQL 和 SQLServer 两种类型的数据源。这两种类型的数据源下 transaction_record 表的建表语句如以下所示 (mysql)。
CREATE TABLE `transaction_record` ( `consumer_group` varchar(128) NOT NULL DEFAULT '', `message_id` varchar(255) NOT NULL DEFAULT '', `topic_name` varchar(255) NOT NULL DEFAULT '', `ctime` bigint(20) NOT NULL, `queue_id` int(11) NOT NULL, `offset` bigint(20) NOT NULL, `broker_name` varchar(255) NOT NULL DEFAULT '', `id` bigint(20) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`), UNIQUE KEY `message_id_unique_key` (`message_id`), KEY `ctime_key` (`ctime`), KEY `load_key` (`queue_id`,`broker_name`,`topic_name`,`ctime`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3 在消息生产端使用 PropertyKeyConst.EXACTLYONCE_DELIVERY 属性设置打开 Exactly-Once 投递语义
/** * TestExactlyOnceProducer 启动 * 通过 PropertyKeyConst.EXACTLYONCE_DELIVERY 开启 exactly-once 投递语义。 */ public class TestExactlyOnceProducer { public static void main(String[] args) { Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.GROUP_ID,"{GROUP_ID}"); producerProperties.setProperty(PropertyKeyConst.AccessKey,"{accessKey}"); producerProperties.setProperty(PropertyKeyConst.SecretKey,"{secretKey}"); producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR,"{NAMESRV_ADDR}"); producerProperties.setProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY,"true"); Producer producer = ExactlyOnceONSFactory.createProducer(producerProperties); producer.start(); System.out.println("Producer Started"); for (int i = 0; i < 10; i++) { Message message = new Message("{topic}", "{tag}", "mq send transaction message test".getBytes()); try { SendResult sendResult = producer.send(message); assert sendResult != null; System.out.println(new Date() + " Send mq message success! msgId is: " + sendResult.getMessageId()); } catch (ONSClientException e) { System.out.println("发送失败"); //出现异常意味着发送失败,为避免消息丢失,建议缓存该消息然后进行重试。 } } producer.shutdown(); } }
4 在消息消费端创建 ExactlyOnceConsumer,并开启 Exactly-Once 的消费模式。
使用消息队列 RocketMQ 版的 Exactly-Once 投递语义进行消费时,消费端需要使用 ExactlyOnceONSFactory 调用 createExactlyOnceConsumer 接口创建 ExactlyOnceConsumer,然后通过使用 ExactlyOnceConsumer 进行 Exactly-Once 模式的消费。
在使用 ExactlyOnceConsumer 时,需要注意以下两点:
- 创建 ExactlyOnceConsumer 时,可以通过设置 PropertyKeyConst.EXACTLYONCE_DELIVERY 属性打开或者关闭 Exactly-Once 投递语义。ExactlyOnceConsumer 默认打开 Exactly-Once 投递语义。
- 使用 ExactlyOnceConsumer 消费时,在消息监听器 MessageListener 的 consume 方法中,您的业务处理逻辑需要使用 MQDataSource 对数据库的进行读写操作。
您可以选择以下任一方式在消费端开启 Exactly-Once 投递语义:
- 以非 Spring 方式开启 Exactly-Once 投递语义
- MessageListener 中以事务方式实现多项数据库操作和消息消费的事务性
- MessageListener 中通过 Springboot 注解方式实现开启 Exactly-Once 投递语义
- MessageListener 中通过 MyBatis 方式实现 Exactly-Once 投递语义
以下示例为:MessageListener 中以事务方式实现多项数据库操作和消息消费的事务性,其他示例代码,请参考阿里云文档:
https://help.aliyun.com/document_detail/102777.html
/** * TestExactlyOnceListener 实现。 * 实现了一个事务中对多个业务表进行更新的场景,保证事务内的操作有且仅有一次生效。 */ public class SimpleTxListener implements MessageListener { private MQDataSource dataSource; public SimpleTxListener() { this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}"); } @Override public Action consume(Message message, ConsumeContext context) { Connection connection = null; Statement statement = null; try { /** * 在此处对消费到的消息 message 做业务计算处理,使用 MQDataSource 将处理结果持久化到数据库系统。 * 此范例演示了在一个事务内对多个表进行更新的业务场景,Exactly-Once 投递语义保证事务内的操作有且仅有一次。 * 实际的业务处理按照:接收消息->业务处理->结果持久化的流程来设计。 */ connection = dataSource.getConnection(); connection.setAutoCommit(false); String insertSql = String.format("INSERT INTO app(msg, message_id, ctime) VALUES(\"%s\", \"%s\", %d)", new String(message.getBody()), message.getMsgID(), System.currentTimeMillis()); String updateSql = String.format("UPDATE consume_count SET cnt = count + 1 WHERE consumer_group = \"%s\"", "GID_TEST"); statement = connection.createStatement(); statement.execute(insertSql); statement.execute(updateSql); connection.commit(); System.out.println("consume message :" + message.getMsgID()); return Action.CommitMessage; } catch (Throwable e) { try { connection.rollback(); } catch (Exception e1) { } System.out.println("consume message fail"); return Action.ReconsumeLater; } finally { if (statement != null) { try { statement.close(); } catch (Exception e) { } } if (connection != null) { try { connection.close(); } catch (Exception e) { } } } } }
总结
你可以看到,这种方式是阿里云做了封装,但理论上跟上一个方案 基于消息表+本地事务 是一样的。
利用 Redis
最后这种方案其实思路跟 基于消息表+本地事务 类似,只不过判断是否重复消息用 Redis 分布式锁实现,关于具体实现步骤也跟我们上一篇文章类似,请参考上一篇文章。
最后
当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入 死信队列
,最终进入到数据库中,接着设置定时 job 查询这些数据,进行手动补偿。