问题场景
支付成功后一笔订单, 怎么能保证这笔订单的支付状态就一定修改了?
依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
配置生产者和消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RocketMQConfig { @Bean(initMethod = "start",destroyMethod = "shutdown") public DefaultMQProducer producer() { DefaultMQProducer producer = new DefaultMQProducer("paymentGroup"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); return producer; } // 消费者的具体逻辑在Bean名为messageListener中 @Bean(initMethod = "start",destroyMethod = "shutdown") public DefaultMQPushConsumer consumer(@Qualifier("messageListener") MessageListenerConcurrently messageListener) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("paymentConsumerGroup"); // Specify name server addresses. consumer.setNamesrvAddr("localhost:9876"); // Subscribe one more more topics to consume. consumer.subscribe("payment", "*"); consumer.registerMessageListener(messageListener); return consumer; } }
支付接口(生产者)
/** * 支付接口(支付成功,MQ发送支付订单orderId消息) * transactionManager = "tm131"表示使用哪个事务管理器, 因为项目中可能有多个 具体配置在下面有 * @param userId * @param orderId * @param amount * @return 0:成功;1:用户不存在;2:余额不足 */ @Transactional(transactionManager = "tm131",rollbackFor = Exception.class) public int pamentMQ(int userId, int orderId, BigDecimal amount) throws Exception { //支付操作 AccountA accountA = accountAMapper.selectByPrimaryKey(userId); if (accountA == null) return 1; if (accountA.getBalance().compareTo(amount) < 0) return 2; accountA.setBalance(accountA.getBalance().subtract(amount)); accountAMapper.updateByPrimaryKey(accountA); // 封装消息对象 Message message = new Message(); message.setTopic("payment"); message.setKeys(orderId+""); message.setBody("订单已支付".getBytes()); try { // 发送消息 SendResult result = producer.send(message); if (result.getSendStatus() == SendStatus.SEND_OK){ return 0; }else { throw new Exception("消息发送失败!"); } } catch (Exception e) { e.printStackTrace(); throw e; } }
transactionManager = "tm131"具体配置
@Configuration @MapperScan(value = "com.example.tccdemo.db131.dao",sqlSessionFactoryRef = "factoryBean131") public class ConfigDb131 { @Bean("db131") public DataSource db131() { MysqlDataSource dataSource = new MysqlDataSource(); dataSource.setUser("imooc"); dataSource.setPassword("Imooc@123456"); dataSource.setUrl("jdbc:mysql://192.168.73.131:3306/xa_131"); return dataSource; } @Bean("factoryBean131") public SqlSessionFactoryBean factoryBean(@Qualifier("db131") DataSource dataSource) throws IOException { SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean(); factoryBean.setDataSource(dataSource); ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver(); factoryBean.setMapperLocations(resourceResolver.getResources("mybatis/db131/*.xml")); return factoryBean; } @Bean("tm131") // transactionManager = "tm131"指的就是这个事务管理器 public PlatformTransactionManager transactionManager(@Qualifier("db131") DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } }
订单状态修改(消费者)
// Bean名字要跟RocketMQConfig类中消费者@Qualifier("messageListener")保持一致 @Component("messageListener") public class ChangeOrderStatus implements MessageListenerConcurrently { @Resource private OrderMapper orderMapper; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { if (list == null || list.size()==0) return CONSUME_SUCCESS; // 消费成功 for (MessageExt messageExt : list) { String orderId = messageExt.getKeys(); String msg = new String(messageExt.getBody()); System.out.println("msg="+msg); Order order = orderMapper.selectByPrimaryKey(Integer.parseInt(orderId)); if (order==null) return RECONSUME_LATER; // 根据具体业务来, 这里再次消费 try { order.setOrderStatus(1);//已支付 order.setUpdateTime(new Date()); order.setUpdateUser(0);//系统更新 orderMapper.updateByPrimaryKey(order); }catch (Exception e){ e.printStackTrace(); return RECONSUME_LATER; } } return CONSUME_SUCCESS; } }