分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(上):https://developer.aliyun.com/article/1419990
编写配置文件
server: port: 9090 spring: application: name: tx-msg-stock datasource: url: jdbc:mysql://192.168.66.100:3306/txmsg-order? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver ################ RocketMQ 配置 ########## rocketmq: name-server: 192.168.66.100:9876 producer: group: order-group
编写主启动类
/** * 订单微服务启动成功 */ @Slf4j @MapperScan("com.tong.order.mapper") @SpringBootApplication public class OrderMain9090 { public static void main(String[] args) { SpringApplication.run(OrderMain9090.class,args); log.info("************* 订单微服务启动成功*******"); } }
代码生成
package com.tong.utils; import com.baomidou.mybatisplus.generator.FastAutoGenerator; import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy; import java.util.Arrays; import java.util.List; public class CodeGenerator { public static void main(String[] args) { FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-order", "root", "123456") .globalConfig(builder -> { builder.author("tong")// 设置作者 .commentDate("MMdd") // 注释日期格式 .outputDir(System.getProperty("user.dir")+"/rocketmq-msg/orders"+ "/src/main/java/") .fileOverride(); //覆盖文件 }) // 包配置 .packageConfig(builder -> { builder.parent("com.tong.orders") // 包名前缀 .entity("entity")//实体类包名 .mapper("mapper")//mapper接口包名 .service("service"); //service包名 }) .strategyConfig(builder -> { // 设置需要生成的表名 builder.addInclude(Arrays.asList("orders","tx_log")) // 开始实体类配置 .entityBuilder() // 开启lombok模型 .enableLombok() //表名下划线转驼峰 .naming(NamingStrategy.underline_to_camel) //列名下划线转驼峰 .columnNaming(NamingStrategy.underline_to_camel); }) .execute(); } }
创建TxMessage类
在项目的com.itbaizhan.orders.tx包下创建TxMessage类,主要用 来封装实现分布式事务时,在订单微服务、RocketMQ消息中间件 和库存微服务之间传递的全局事务消息,项目中会通过事务消息实现幂等。
@Data @NoArgsConstructor @AllArgsConstructor public class TxMessage implements Serializable { private static final long serialVersionUID = -4704980150056885074L; /** * 商品id */ private Long productId; /** * 商品购买数量 */ private Integer payCount; /** * 全局事务编号 */ private String txNo; }
可靠消息最终一致性分布式事务实战_订单微服务业务层实现
业务逻辑层主要实现了用户提交订单后的业务逻辑。
编写OrderService接口
/** * 添加订单 * @param productId 商品id * @param payCount 购买数量 */ void save(Long productId,Integer payCount); /** * 提交订单同时保存事务信息 */ void submitOrderAndSaveTxNo(TxMessage txMessage); /** * 提交订单 * @param productId 商品id * @param payCount 购买数量 */ void submitOrder(Long productId, Integer payCount);
编写OrderService接口实现
package com.itbaizhan.order.service.impl; import com.alibaba.fastjson.JSONObject; import com.tong.order.entity.Order; import com.tong.order.entity.TxLog; import com.tong.order.mapper.OrderMapper; import com.tong.order.mapper.TxLogMapper; import com.tong.order.service.IOrderService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.tong.order.tx.TxMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.Date; import java.util.UUID; /** * <p> * 服务实现类 * </p> * * @author tong * @since 05-20 */ @Slf4j @Service public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements IOrderService { @Resource RocketMQTemplate rocketMQTemplate; @Resource private TxLogMapper txLogMapper; /** * 添加 * @param productId 商品id * @param payCount 购买数量 */ @Override public void save(Long productId, Integer payCount) { Order order = new Order(); // 订单创建时间 order.setCreateTime(LocalDateTime.now()); // 生产订单编号 order.setOrderNo(UUID.randomUUID().toString().replace("-","")); // 商品id order.setProductId(productId); // 购买数量 order.setPayCount(payCount); baseMapper.insert(order); } @Override @Transactional(rollbackFor = Exception.class) public void submitOrderAndSaveTxNo(TxMessage txMessage) { TxLog txLog = txLogMapper.selectById(txMessage.getTxNo()); if(txLog != null){ log.info("订单微服务已经执行过事务,商品id为:{},事务编号为:{}",txMessage.getProductId(),txMessage.getTxNo()); return; } //生成订单 this.save(txMessage.getProductId(),txMessage.getPayCount()); //生成订单 txLog = new TxLog(); txLog.setTxNo(txMessage.getTxNo()); txLog.setCreateTime(LocalDateTime.now()); //添加事务日志 txLogMapper.insert(txLog); } /** * 提交订单 * @param productId 商品id * @param payCount 购买数量 */ @Override public void submitOrder(Long productId,Integer payCount) { //生成全局分布式序列号 String txNo = UUID.randomUUID().toString(); TxMessage txMessage = new TxMessage(productId, payCount, txNo); JSONObject jsonObject = new JSONObject(); jsonObject.put("txMessage", txMessage); Message<String> message = MessageBuilder.withPayload(jsonObject.toJSONString()).build(); //发送事务消息 且该消息不允许消费 tx_order_group: 指定版事务消息组 rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg", message, null); } }
可靠消息最终一致性分布式事务实战_订单微服务监听事务消息
执行本地的业务代码
package com.tong.order.message; import com.alibaba.fastjson.JSONObject; import com.tong.order.entity.TxLog; import com.tong.order.mapper.TxLogMapper; import com.tong.order.service.IOrderService; import com.tong.order.service.ITxLogService; import com.tong.order.tx.TxMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; /** * @author tong * @version 1.0.0 * @description 监听事务消息 */ @Slf4j @Component @RocketMQTransactionListener(txProducerGroup = "tx_order_group") public class OrderTxMessageListener implements RocketMQLocalTransactionListener { @Autowired private IOrderService orderService; @Resource private TxLogMapper txLogMapper; /** * RocketMQ的Producer本地事务:先执行本地的业务代码(使用Spring的事件管理),判断是否成功。 * 成功返回: RocketMQLocalTransactionState.COMMIT, * 失败返回:RocketMQLocalTransactionState.ROLLBACK */ @Override @Transactional(rollbackFor = Exception.class) public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) { try { log.info("订单微服务执行本地事务"); TxMessage txMessage = this.getTxMessage(msg); //执行本地事务 orderService.submitOrderAndSaveTxNo(txMessage); //提交事务 log.info("订单微服务提交事务"); // COMMIT:即生产者通知Rocket该消息可以消费 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); //异常回滚事务 log.info("订单微服务回滚事务"); // ROLLBACK:即生产者通知Rocket将该消息删除 return RocketMQLocalTransactionState.ROLLBACK; } } private TxMessage getTxMessage(Message msg) { String messageString = new String((byte[]) msg.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageString); String txStr = jsonObject.getString("txMessage"); return JSONObject.parseObject(txStr,TxMessage.class); } }
网络异常消息处理
/** * 因为网络异常或其他原因时,RocketMQ的消息状态处于UNKNOWN时,调用该方法检查Producer的本地 * 事务是否已经执行成功, * 成功返回: RocketMQLocalTransactionState.COMMIT, * 失败返回:RocketMQLocalTransactionState.ROLLBACK */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { log.info("订单微服务查询本地事务"); TxMessage txMessage = this.getTxMessage(msg); // 获取订单的消息 Integer exists = txLogService.isExistsTx(txMessage.getTxNo()); if (exists != null) { // COMMIT:即生产者通知Rocket该消息可以消费 return RocketMQLocalTransactionState.COMMIT; } // UNKNOWN:即生产者通知Rocket继续查询该消息的状态 return RocketMQLocalTransactionState.UNKNOWN; } private TxMessage getTxMessage(Message msg) { String messageString = new String((byte[]) msg.getPayload()); JSONObject jsonObject = JSONObject.parseObject(messageString); String txStr = jsonObject.getString("txMessage"); return JSONObject.parseObject(txStr,TxMessage.class); }
可靠消息最终一致性分布式事务实战_实现库存微服务
创建库存微服务tx-msg-stock
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starterweb</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connectorjava</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-bootstarter</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-bootstarter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
编写配置文件
server: port: 6060 spring: application: name: tx-msg-stock datasource: url: jdbc:mysql://192.168.66.100:3306/txmsg-stock? useUnicode=true&characterEncoding=UTF8&useOldAliasMetadataBehavior=true&autoReconnec t=true&failOverReadOnly=false&useSSL=false username: root password01: 123456 driver-class-name: com.mysql.cj.jdbc.Driver ################ RocketMQ 配置 ########## rocketmq: name-server: 192.168.66.100:9876
编写主启动类
package com.tong.stock; import lombok.extern.slf4j.Slf4j; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author tong * @version 1.0.0 * @description 库存微服务启动类 */ @MapperScan("com.tong.stock.mapper") @Slf4j @SpringBootApplication public class StockServerStarter { public static void main(String[] args) { SpringApplication.run(StockServerStarter.class, args); log.info("**************** 库存服务启动成功 ***********"); } }
代码生成
package com.tong.utils; import com.baomidou.mybatisplus.generator.FastAutoGenerator; import com.baomidou.mybatisplus.generator.config.rules.NamingStrategy; import java.util.Arrays; import java.util.List; public class CodeGenerator { public static void main(String[] args) { FastAutoGenerator.create("jdbc:mysql://192.168.66.102:3306/tx-msg-stock", "root", "123456") .globalConfig(builder -> { builder.author("tong")// 设置作者 .commentDate("MMdd") // 注释日期格式 .outputDir(System.getProperty("user.dir") +"/rocketmq-msg/stock"+ "/src/main/java/") .fileOverride(); //覆盖文件 }) // 包配置 .packageConfig(builder -> { builder.parent("com.tong.stock") // 包名前缀 .entity("entity")//实体类包名 .mapper("mapper")//mapper接口包名 .service("service"); //service包名 }) .strategyConfig(builder -> { // 设置需要生成的表名 builder.addInclude(Arrays.asList("stock","tx_log")) // 开始实体类配置 .entityBuilder() // 开启lombok模型 .enableLombok() //表名下划线转驼峰 .naming(NamingStrategy.underline_to_camel) //列名下划线转驼峰 .columnNaming(NamingStrategy.underline_to_camel); }) .execute(); } }
编写库存接口
public interface StockService { /** * 根据id查询库存 * @param id * @return */ Stock getStockById(Long id); /** * 扣减库存 */ void decreaseStock(TxMessage txMessage); }