基于MQ实现分布式事务

简介: 基于MQ实现分布式事务

问题场景

支付成功后一笔订单, 怎么能保证这笔订单的支付状态就一定修改了?

依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
        </dependency>

配置生产者和消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RocketMQConfig {
    @Bean(initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQProducer producer() {
        DefaultMQProducer producer = new
                DefaultMQProducer("paymentGroup");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        return producer;
    }
  // 消费者的具体逻辑在Bean名为messageListener中
    @Bean(initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException {
        DefaultMQPushConsumer consumer = new
                DefaultMQPushConsumer("paymentConsumerGroup");
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        // Subscribe one more more topics to consume.
        consumer.subscribe("payment", "*");
        consumer.registerMessageListener(messageListener);
        return consumer;
    }
}

支付接口(生产者)

/**
     * 支付接口(支付成功,MQ发送支付订单orderId消息)
     * transactionManager = "tm131"表示使用哪个事务管理器, 因为项目中可能有多个 具体配置在下面有
     * @param userId
     * @param orderId
     * @param amount
     * @return 0:成功;1:用户不存在;2:余额不足
     */
    @Transactional(transactionManager = "tm131",rollbackFor = Exception.class)
    public int pamentMQ(int userId, int orderId, BigDecimal amount) throws Exception {
        //支付操作
        AccountA accountA = accountAMapper.selectByPrimaryKey(userId);
        if (accountA == null) return 1;
        if (accountA.getBalance().compareTo(amount) < 0) return 2;
        accountA.setBalance(accountA.getBalance().subtract(amount));
        accountAMapper.updateByPrimaryKey(accountA);
        // 封装消息对象
        Message message = new Message();
        message.setTopic("payment");
        message.setKeys(orderId+"");
        message.setBody("订单已支付".getBytes());
        try {
            // 发送消息
            SendResult result = producer.send(message);
            if (result.getSendStatus() == SendStatus.SEND_OK){
                return 0;
            }else {
                throw new Exception("消息发送失败!");
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

transactionManager = "tm131"具体配置

@Configuration
@MapperScan(value = "com.example.tccdemo.db131.dao",sqlSessionFactoryRef = "factoryBean131")
public class ConfigDb131 {
    @Bean("db131")
    public DataSource db131() {
        MysqlDataSource dataSource = new MysqlDataSource();
        dataSource.setUser("imooc");
        dataSource.setPassword("Imooc@123456");
        dataSource.setUrl("jdbc:mysql://192.168.73.131:3306/xa_131");
        return dataSource;
    }
    @Bean("factoryBean131")
    public SqlSessionFactoryBean factoryBean(@Qualifier("db131") DataSource dataSource) throws IOException {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
        factoryBean.setMapperLocations(resourceResolver.getResources("mybatis/db131/*.xml"));
        return factoryBean;
    }
    @Bean("tm131") // transactionManager = "tm131"指的就是这个事务管理器
    public PlatformTransactionManager transactionManager(@Qualifier("db131") DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
}

订单状态修改(消费者)

// Bean名字要跟RocketMQConfig类中消费者@Qualifier("messageListener")保持一致
@Component("messageListener") 
public class ChangeOrderStatus implements MessageListenerConcurrently {
    @Resource
    private OrderMapper orderMapper;
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (list == null || list.size()==0) return CONSUME_SUCCESS; // 消费成功
        for (MessageExt messageExt : list) {
            String orderId = messageExt.getKeys();
            String msg = new String(messageExt.getBody());
            System.out.println("msg="+msg);
            Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId));
            if (order==null) return RECONSUME_LATER; // 根据具体业务来, 这里再次消费
            try {
                order.setOrderStatus(1);//已支付
                order.setUpdateTime(new Date());
                order.setUpdateUser(0);//系统更新
                orderMapper.updateByPrimaryKey(order);
            }catch (Exception e){
                e.printStackTrace();
                return RECONSUME_LATER;
            }
        }
        return CONSUME_SUCCESS;
    }
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 存储 Kafka
如何保证MQ消息队列的高可用?
如何保证MQ消息队列的高可用?
213 0
|
8月前
|
消息中间件 Kafka RocketMQ
mq实现分布式事务
mq实现分布式事务
85 0
mq实现分布式事务
|
6月前
|
消息中间件 存储 Java
消息中间件第五讲:RocketMQ事务消息
消息中间件第五讲:RocketMQ事务消息
122 0
|
9月前
|
消息中间件
通过消息队列mq解决分布式事务问题的原理
通过消息队列mq解决分布式事务问题的原理
|
10月前
|
消息中间件 Java Spring
消息中间件MQ
个人理解
71 0
|
消息中间件 存储 中间件
分布式事务Seata【四】事务消息
本篇介绍本地消息表和MQ事务方案
287 0
分布式事务Seata【四】事务消息
|
消息中间件 Java 数据库
RocketMq- 分布式事务消息
RocketMq- 分布式事务消息
RocketMq- 分布式事务消息
|
消息中间件 存储 NoSQL
消息中间件之MQ详解及四大MQ比较
消息中间件之MQ详解及四大MQ比较
消息中间件之MQ详解及四大MQ比较
|
消息中间件 NoSQL Java
基于RocketMq解决分布式事务
基于RocketMq解决分布式事务
243 1
基于RocketMq解决分布式事务
|
消息中间件 存储 RocketMQ
RocketMQ-事务消息(分布式事务)
本文将阐述分布式事务的实现方式。
RocketMQ-事务消息(分布式事务)