开发者社区 问答 正文

配置 MQ 加入事务

配置 MQ 加入事务,可以保证数据库操作与消息发送的一致性。

数据库操作提交,则消息一定发送成功。
数据库操作回滚,则消息一定不会被发送出去。
本文档提供了两种将 MQ 加入事务的方法,详细步骤请参见 操作步骤 部分。

应用场景

有些系统在使用数据库保证系统内数据一致的同时, 也会使用消息队列(MQ) 作为和其他系统间的消息传递, 完成不同系统间的数据一致。

业务逻辑

A 转账给 B 了 10 次,前五次成功,后五次失败。

在 A 转账给 B 成功后,增加了 MQ 的一个通知,告知转账成功。

使用 GTS 事务保证了 A 和 B 钱的总数始终不变,同时保证了只有转账成功的情况下 MQ 通知才可见。

前提条件

已申请了事务分组,并申请了一个 MQ 服务。

已准备好两台 RDS 和一台 ECS。 ECS 用于部署本应用。

在两个 RDS 实例中分别执行 txc_sample1.sql 、 txc_sample2.sql 和 txc_undo_log.sql 完成建表。

操作步骤

GTS 提供了两种方法将 MQ 加入事务。

通过代码将 MQ 加入事务

通过样例将 MQ 加入事务

通过代码将 MQ 加入事务

在配置文档 xml 中配置好 MQ 资源,例如:

<bean class="com.ta obao.txc.client.aop.MTRelationShipManager">
    <property name="beanNames" ref="mtServicesClassList" />
    <property name="interceptorNames">
        <list>
            <value>mtBranchInterceptor</value>
        </list>
    </property>
    <property name="order" value="1"></property>
    <property name="proxyTargetClass" value="false"></property>
</bean>
<bean id="mtBranchInterceptor" class="com.taobao.txc.resourcemanager.mt.MtBranchInterceptor"/>
<bean id="mtServicesClassList" class="org.springframework.beans.factory.config.ListFactoryBean">
    <property name="sourceList">
        <list>
            <value>com.taobao.txc.rm.mq.TxcMQProducer</value>
        </list>
    </property>
</bean>
<bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl">
    <constructor-arg name="ProducerId" value="PID_txc_mq_prod_test"/>
    <constructor-arg name="AccessKey" value="XXX"/>
    <constructor-arg name="SecretKey" value="XXX"/>
</bean>
在代码中启动一个 MQ 资源,例如:

TxcMQProducer txcMQProducer = (TxcMQProducer) context.getBean("txc_mq_producer");
txcMQProducer.start();
System.out.println("Producer started!");
要实现 MQ 事务,需要在 GTS 注解函数范围内发用一个 MQ 消息,例如:

@TxcTransaction(appName = "myapp")
public void update(Connection connection1, Connection connection2) throws SQLException, MQClientException {        
    //数据源操作
    update1(connection1);
    update2(connection2);
    //创建一个消息
    Message msg = new Message(/*topic*/"txc_mq_test",
            /*tag*/"onePay",
            String.format("onePay message:success\n").getBytes());
    /* 发送一个消息,事务完成消息可见 */
    SendResult sendResult = txcMQProducer.send(null, 0, msg);
}
MQ 的 consumer 工程的配置与使用可以参考下面这个例子实现:

Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, "CID_txc_mq_prod_test");
properties.put(PropertyKeyConst.AccessKey, "XXX");
properties.put(PropertyKeyConst.SecretKey, "XXX");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("txc_mq_test", "*", new MessageListener() {
        @Override
        public Action consume(Message message, ConsumeContext context) {
            log.info(String.format("SMSListener got message:%s", message));
            System.out.println(String.format("SMSListener got message:%s", message));
            System.out.println(new String(message.getBody()));
            return Action.CommitMessage;
        }
});
consumer.start();
通过样例将 MQ 加入事务

下载样例工程,解压,得到 sample-txc-mq 文件夹。

把该工程文件夹到 ECS 服务器上,找到 sample-txc-mq/src/main/resources 目录下的 txc-mq-client-context.xml 文件,对该文件中的两个数据源进行修改,分别替换为两个 RDS 的数据源。

将该文件中 <constructor-arg value="xxxxx"/> 的 xxxxx 替换为在步骤 1 中申请的 GTS 组 id。

将自己申请的 MQ 相关信息配置到该文件中,即将 <bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl"> 下的参数替换为自己的 MQ 配置。

将该文件中 <property name=" accessKey" value="xxxxx"/><property name=" secretKey" value="xxxxx"/> 的 xxxxx 分别替换为您阿里云账户的 Access Key ID 和 Access Key Secret。

在 sample-txc-mq 目录下执行 build.sh 编译本工程。编译完成后在 sample-txc-mq/client/bin 目录下执行 run.sh 可以看到 MQ 的 provider 运行结果。

将 sample-mq-consumer 工程拷贝到 ECS 服务器中,在 sample-mq-consumer/src/main/java/com/taobao/txc/tests 中找到 SMSListener.java,修改其中的 xxxxx 为申请的 MQ 配置。在 sample-mq-consumer 目录下执行 build.sh 编译该工程,编译完成后 在sample-mq-consumer/client/bin 目录下执行 run.sh 可以 consumer 掉刚刚 sample-txc-mq 工程生产出来的 MQ 消息。

sample-txc-mq 工程的 Java 源代码在 /sample-txc-mq/src/main/java/com/taobao/txc/tests 目录下,可以根据业务需求修改。

注意事项

在部署 MQ 下的 GTS 应用时,需要在代码中配置 MQ 资源,包括 ProducerId、consumerId 和AccessKeys。

如果您还没有 MQ 资源,需要申请一个 MQ 资源,MQ 资源申请参考 步骤二:申请资源。

com.taobao.txc.client.aop.MTRelationShipManager 一定要先于com.taobao.txc.client.aop.TxcTransactionScaner 进行声明。

结果验证

通过上述 GTS 配置可以将 MQ 消息和多个数据源加入一个事务中,MQ 消息和多个数据源会保持强一致,一旦有异常返回给带 @TxcTransaction 注解的方法,都会导致这个全局事务数据回滚到之前的状态,同时 MQ 消息也不会发送出去。

展开
收起
猫饭先生 2017-10-31 11:05:57 1997 分享 版权
0 条回答
写回答
取消 提交回答