微服务应用集成MQ
- 引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>
- 配置文件配置
rocketmq: name-server: xxxx:9876 producer: group: base_group_syncMsg send-message-timeout: 5000 retry-times-when-send-failed: 2 max-message-size: 4194304
bank1 应用实现
提供请求api
@GetMapping(value = "/rocketmq") public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){ //创建事务id,作为消息内容发到mq String tx_no = UUID.randomUUID().toString(); //封装事件实体 AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no); //发送消息 accountInfoService.sendUpdateAccountBalance(accountChangeEvent); return "处理成功-账号:{"+accountNo+"}扣减:{"+amount+"}"; }
扣款请求
发送消息到MQ
/** * 向mq发送转账消息 * @param accountChangeEvent 事件实体 */ @Override public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) { //将accountChangeEvent转成json JSONObject jsonObject =new JSONObject(); jsonObject.put("accountChange",accountChangeEvent); String jsonString = jsonObject.toJSONString(); //生成message类型 Message<String> message = MessageBuilder.withPayload(jsonString).build(); //发送一条事务消息 /** * String txProducerGroup 生产组 * String destination 主题, * Message<?> message, 消息内容 * Object arg 参数 */ rocketMQTemplate.sendMessageInTransaction("producer_group_bank1","bank",message,null); }
监听MQ返回
/** * @author 小隐乐乐 * @date 2021/06/3 * @description 消费者监听 */ @Slf4j @Component @RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}") public class ConsumerListener implements RocketMQListener<String> { /** * 注入业务实现 */ @Autowired AccountInfoService accountInfoService; /** * 接收消息 */ @Override public void onMessage(String message) { log.info("获取到的消费消息:{}",message); //解析 JSONObject jsonObject = JSONObject.parseObject(message); String accountChangeString = jsonObject.getString("accountChange"); //转成AccountChangeEvent对象 AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class); //设置账号 accountChangeEvent.setAccountNo("2"); //执行业务操作---增加金额 accountInfoService.addAccountInfoBalance(accountChangeEvent); } }
实现本地业务逻辑
/** * @author 小隐乐乐 * @date 2021/06/3 * @description 账户业务实现 */ @Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; //更新账户--增加金额 @Override @Transactional public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) { log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount()); //本地读取事务 防止重复消费 if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){ return ; } //插入数据--增加金额 accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount()); //添加事务记录,用于幂等 accountInfoDao.addTx(accountChangeEvent.getTxNo()); //预留错误演示 if(accountChangeEvent.getAmount() == 250){ throw new RuntimeException("消息处理异常"); } } }
bank1 事务回调监听
/** * @author 小隐乐乐 * @date 2021/06/3 * @description 生产者事务回调监听器 */ @Component @Slf4j @RocketMQTransactionListener(txProducerGroup = "producer_group_bank1") public class ProducerCallbackListener implements RocketMQLocalTransactionListener { @Autowired AccountInfoService accountInfoService; @Autowired AccountInfoDao accountInfoDao; /** * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调 * @param message 消息 * @return */ @Override @Transactional public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { //解析消息 String messageString = new String((byte[]) message.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageString); //转成AccountChangeEvent实体 String accountChangeString = jsonObject.getString("accountChange"); //将accountChange(json)转成AccountChangeEvent AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class); //执行本地事务,扣减金额 accountInfoService.doUpdateAccountBalance(accountChangeEvent); //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); //向mq发送ROLLBACK,mq将消息的状态依旧无法消费 return RocketMQLocalTransactionState.ROLLBACK; } } /** * 事务状态回查,查询是否扣减金额 * @param message 消息 * @return */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { //解析message,转成AccountChangeEvent String messageString = new String((byte[]) message.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageString); String accountChangeString = jsonObject.getString("accountChange"); //将accountChange(json)转成AccountChangeEvent AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class); //事务id String txNo = accountChangeEvent.getTxNo(); int existTx = accountInfoDao.isExistTx(txNo); if(existTx>0){ return RocketMQLocalTransactionState.COMMIT; }else{ return RocketMQLocalTransactionState.UNKNOWN; } } }
bank2 应用实现
bank2 监听MQ
/** * @author 小隐乐乐 * @date 2021/06/3 * @description 消费者监听 */ @Slf4j @Component @RocketMQMessageListener(topic = "bank", consumerGroup = "rocketmq.consumer.group") public class ConsumerListener implements RocketMQListener<String> { /** * 注入业务实现 */ @Autowired AccountInfoService accountInfoService; /** * 接收消息 */ @Override public void onMessage(String message) { log.info("获取到的消费消息:{}",message); //解析 JSONObject jsonObject = JSONObject.parseObject(message); String accountChangeString = jsonObject.getString("accountChange"); //转成AccountChangeEvent对象 AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class); //设置账号 accountChangeEvent.setAccountNo("2"); //执行业务操作---增加金额 accountInfoService.addAccountInfoBalance(accountChangeEvent); } }
消息消费
log.info("获取到的消费消息:{}",message); //解析 JSONObject jsonObject = JSONObject.parseObject(message); String accountChangeString = jsonObject.getString("accountChange"); //转成AccountChangeEvent对象 AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
执行本地扣款事务
accountInfoService.addAccountInfoBalance(accountChangeEvent);
总结
终于搞完了,写demo还是很费事的。
分布式事务解决方案很多,到底需不需要分布式事务,也是需要我们技术人员去考量的。那么如果需要,我相信,本篇文章作为RocketMQ实现消息队列分布式事务的快速上手文章,相信你不容错过。如果觉得写的不错,我准备出专栏,哈哈哈。
躺平,在追求梦想的人身上不是一个好选择,技术的脚步,是一直向前的,努力吧,少年们!!!