配置 MQ 加入事务,可以保证数据库操作与消息发送的一致性。
数据库操作提交,则消息一定发送成功。
数据库操作回滚,则消息一定不会被发送出去。
本文档提供了两种将 MQ 加入事务的方法,详细步骤请参见 操作步骤 部分。
应用场景
有些系统在使用数据库保证系统内数据一致的同时, 也会使用消息队列(MQ) 作为和其他系统间的消息传递, 完成不同系统间的数据一致。
业务逻辑
A 转账给 B 了 10 次,前五次成功,后五次失败。
在 A 转账给 B 成功后,增加了 MQ 的一个通知,告知转账成功。
使用 GTS 事务保证了 A 和 B 钱的总数始终不变,同时保证了只有转账成功的情况下 MQ 通知才可见。
前提条件
已申请了事务分组,并申请了一个 MQ 服务。
已准备好两台 RDS 和一台 ECS。 ECS 用于部署本应用。
在两个 RDS 实例中分别执行 txc_sample1.sql 、 txc_sample2.sql 和 txc_undo_log.sql 完成建表。
操作步骤
GTS 提供了两种方法将 MQ 加入事务。
通过代码将 MQ 加入事务
通过样例将 MQ 加入事务
通过代码将 MQ 加入事务
在配置文档 xml 中配置好 MQ 资源,例如:
<bean class="com.ta obao.txc.client.aop.MTRelationShipManager">
<property name="beanNames" ref="mtServicesClassList" />
<property name="interceptorNames">
<list>
<value>mtBranchInterceptor</value>
</list>
</property>
<property name="order" value="1"></property>
<property name="proxyTargetClass" value="false"></property>
</bean>
<bean id="mtBranchInterceptor" class="com.taobao.txc.resourcemanager.mt.MtBranchInterceptor"/>
<bean id="mtServicesClassList" class="org.springframework.beans.factory.config.ListFactoryBean">
<property name="sourceList">
<list>
<value>com.taobao.txc.rm.mq.TxcMQProducer</value>
</list>
</property>
</bean>
<bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl">
<constructor-arg name="ProducerId" value="PID_txc_mq_prod_test"/>
<constructor-arg name="AccessKey" value="XXX"/>
<constructor-arg name="SecretKey" value="XXX"/>
</bean>
在代码中启动一个 MQ 资源,例如:
TxcMQProducer txcMQProducer = (TxcMQProducer) context.getBean("txc_mq_producer");
txcMQProducer.start();
System.out.println("Producer started!");
要实现 MQ 事务,需要在 GTS 注解函数范围内发用一个 MQ 消息,例如:
@TxcTransaction(appName = "myapp")
public void update(Connection connection1, Connection connection2) throws SQLException, MQClientException {
//数据源操作
update1(connection1);
update2(connection2);
//创建一个消息
Message msg = new Message(/*topic*/"txc_mq_test",
/*tag*/"onePay",
String.format("onePay message:success\n").getBytes());
/* 发送一个消息,事务完成消息可见 */
SendResult sendResult = txcMQProducer.send(null, 0, msg);
}
MQ 的 consumer 工程的配置与使用可以参考下面这个例子实现:
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, "CID_txc_mq_prod_test");
properties.put(PropertyKeyConst.AccessKey, "XXX");
properties.put(PropertyKeyConst.SecretKey, "XXX");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("txc_mq_test", "*", new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
log.info(String.format("SMSListener got message:%s", message));
System.out.println(String.format("SMSListener got message:%s", message));
System.out.println(new String(message.getBody()));
return Action.CommitMessage;
}
});
consumer.start();
通过样例将 MQ 加入事务
下载样例工程,解压,得到 sample-txc-mq 文件夹。
把该工程文件夹到 ECS 服务器上,找到 sample-txc-mq/src/main/resources 目录下的 txc-mq-client-context.xml 文件,对该文件中的两个数据源进行修改,分别替换为两个 RDS 的数据源。
将该文件中 <constructor-arg value="xxxxx"/> 的 xxxxx 替换为在步骤 1 中申请的 GTS 组 id。
将自己申请的 MQ 相关信息配置到该文件中,即将 <bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl"> 下的参数替换为自己的 MQ 配置。
将该文件中 <property name=" accessKey" value="xxxxx"/><property name=" secretKey" value="xxxxx"/> 的 xxxxx 分别替换为您阿里云账户的 Access Key ID 和 Access Key Secret。
在 sample-txc-mq 目录下执行 build.sh 编译本工程。编译完成后在 sample-txc-mq/client/bin 目录下执行 run.sh 可以看到 MQ 的 provider 运行结果。
将 sample-mq-consumer 工程拷贝到 ECS 服务器中,在 sample-mq-consumer/src/main/java/com/taobao/txc/tests 中找到 SMSListener.java,修改其中的 xxxxx 为申请的 MQ 配置。在 sample-mq-consumer 目录下执行 build.sh 编译该工程,编译完成后 在sample-mq-consumer/client/bin 目录下执行 run.sh 可以 consumer 掉刚刚 sample-txc-mq 工程生产出来的 MQ 消息。
sample-txc-mq 工程的 Java 源代码在 /sample-txc-mq/src/main/java/com/taobao/txc/tests 目录下,可以根据业务需求修改。
注意事项
在部署 MQ 下的 GTS 应用时,需要在代码中配置 MQ 资源,包括 ProducerId、consumerId 和AccessKeys。
如果您还没有 MQ 资源,需要申请一个 MQ 资源,MQ 资源申请参考 步骤二:申请资源。
com.taobao.txc.client.aop.MTRelationShipManager 一定要先于com.taobao.txc.client.aop.TxcTransactionScaner 进行声明。
结果验证
通过上述 GTS 配置可以将 MQ 消息和多个数据源加入一个事务中,MQ 消息和多个数据源会保持强一致,一旦有异常返回给带 @TxcTransaction 注解的方法,都会导致这个全局事务数据回滚到之前的状态,同时 MQ 消息也不会发送出去。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。