可靠消息最终一致性分布式事务实现_RocketMQ事务消息
RocketMQ是阿里巴巴开源的一款支持事务消息的消息中间件,于 2012年正式开源,2017年成为Apache基金会的顶级项目。
实现原理
RocketMQ 4.3版之后引入了完整的事务消息机制,其内部实现了完 整的本地消息表逻辑,使用RocketMQ实现可靠消息分布式事务就 不用用户再实现本地消息表的逻辑了,极大地减轻了开发工作量。
可靠消息最终一致性分布式事务实战_案列业务介绍
业务介绍
通过RocketMQ中间件实现可靠消息最终一致性分布式事务,模拟 商城业务中的下单扣减库存场景。订单微服务和库存微服务分别独立开发和部署。
流程
架构选型
数据库表设计
orders订单数据表
orders数据表存储于tx-msg-orders订单数据库。
DROP TABLE IF EXISTS `orders`; CREATE TABLE `order` ( `id` bigint(20) NOT NULL COMMENT '主键', `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', `order_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '订单 编号', `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id', `pay_count` int(11) NULL DEFAULT NULL COMMENT '购买数量', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; CREATE TABLE `tx_log` ( `tx_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号', `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
stock库存数据表
DROP TABLE IF EXISTS `stock`; CREATE TABLE `stock` ( `id` bigint(20) NOT NULL COMMENT '主键id', `product_id` bigint(20) NULL DEFAULT NULL COMMENT '商品id', `total_count` int(11) NULL DEFAULT NULL COMMENT '商品总库存', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; -- ---------------------------- -- Table structure for tx_log -- ---------------------------- DROP TABLE IF EXISTS `tx_log`; CREATE TABLE `tx_log` ( `tx_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '分布式事务全局序列号', `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`tx_no`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
tx_log事务记录表
可靠消息最终一致性分布式事务实战_Docker安装 RocketMQ
在安装RocketMQ之前,我们先了解一下RocketMQ的部署架构,了 解一下RocketMQ的组件,然后基于当前主流的Docker安装 RocketMQ,我们这里安装单台RocketMQ,但为了防止单节点故 障、保障高可用,生产环境建议安装RocketMQ集群。
安装NameServer
拉取镜像
docker pull rocketmqinc/rocketmq
创建数据存储目录
mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store
启动NameServer
docker run -d \ --restart=always \ --name rmqnamesrv \ -p 9876:9876 \ -v /docker/rocketmq/data/namesrv/logs:/root/logs \ -v /docker/rocketmq/data/namesrv/store:/root/store \ -e "MAX_POSSIBLE_HEAP=100000000" \ rocketmqinc/rocketmq \ sh mqnamesrv
安装Broker
border配置:创建 broker.conf 配置文件
vim /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的 主从关系 brokerName = broker-a #0表示Master,大于0表示不同的 slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和 异步表示Master和Slave之间同步数据的机 制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷 盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状 态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH # 设置broker节点所在服务器的ip地址 brokerIP1 = 192.168.66.100 #剩余磁盘比例 diskMaxUsedSpaceRatio=99
启动broker
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/rocketmq -4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
报错:
部署RocketMQ的管理工具
RocketMQ提供了UI管理工具,名为rocketmq-console,我们选择 docker安装
#创建并启动容器 docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=- Drocketmq.namesrv.addr=192.168.66.100:9876 - Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng
关闭防火墙(或者开放端口)
#关闭防火墙 systemctl stop firewalld.service #禁止开机启动 systemctl disable firewalld.service
测试
访问:http://192.168.66.101:8080/#/ (可以切换中文)
可靠消息最终一致性分布式事务实战_实现订单微服务
创建父工程rocketmq-msg
创建订单微服务子工程
引入依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starterweb</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connectorjava</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-bootstarter</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-bootstarter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
分布式事物【RocketMQ事务消息、Docker安装 RocketMQ、实现订单微服务、订单微服务业务层实现】(八)-全面详解(学习总结---从入门到深化)(下):https://developer.aliyun.com/article/1419991