阿里开源分布式事务Fescar demo示例及原理分析

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 分布式事务Fescar 使用及原理解析

@[toc]

简介

阿里巴巴近日开源了分布式事务中间件 fescar。GitHub地址是 https://github.com/alibaba/fescar
官方中文文档:https://github.com/alibaba/fescar/wiki/Home_Chinese
但是现在中文文档连接都不对,打不开,不知为何。

阿里巴巴现在内部使用的版本是GTS,fescar是其开源社区版本,fescar是Fast & EaSy Commit And Rollback, FESCAR的简称。更多简介请参考

==fescar架构简介参考: 阿里巴巴开源分布式事务解决方案 Fescar==

运行官方demo

https://github.com/alibaba/fescar克隆到本地IDEA,demo运行使用到的是examples和server。
image

我这里使用的是0.1.1版本:
image
运行程序首先需要配置数据库并且初始化SQL。
1、修改以下三个配置文件的MySQL地址
image

test module中的MySQL配置也改一下:
image

另外由于我本地的MySQL是8.X版本,所以驱动要升级。
image

我使用的JDK11,com.alibaba.fescar.tm.dubbo.impl.OrderServiceImpl有个BUG,运行时会报错,需要修改一下:
image

最后执行以下SQL语句初始化表:

DROP TABLE IF EXISTS `storage_tbl`;
CREATE TABLE `storage_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT 0,
  PRIMARY KEY (`id`),
  UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT 0,
  `money` int(11) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `money` int(11) DEFAULT 0,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `idx_unionkey` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
AI 代码解读

好了,准备工作做完。
实例简述:
该实例模拟一个下单功能,由businessService发起,首先扣减库存,然后创建订单。
image

启动server项目的com.alibaba.fescar.server.Server的main方法;
启动example项目的AccountServiceImpl、OrderServiceImpl、StorageServiceImpl三个类的main方法,也就是dubbo服务提供者;
以上4个服务启动完成后,查看DB中的记录,会初始化account_tbl和storage_tbl两张表,插入一条记录(左侧的表)
image

执行com.alibaba.fescar.tm.dubbo.impl.BusinessServiceImplmain方法。会发现执行报错,DB表数据没有变更。
是因为在com.alibaba.fescar.tm.dubbo.impl.BusinessServiceImpl#purchase方法中存在模拟的异常,我们将其注释掉,再次执行:
image

注释掉以后执行,可以发现没有报错,DB中的数据已经正确修改(参见上图的右侧三张表的数据)。

至此demo运行成功。

事务回滚原理简介

根据 阿里巴巴开源分布式事务解决方案 Fescar 介绍的原理,简单看看其rollback的原理。后续专门分析一下fescar的源码。

阿里巴巴开源分布式事务解决方案 Fescar 讲到它是优化了两阶段提交,减少锁的时间,利用本地事务真正提交事务,并且记录可用于回滚的日志,然后出错时根据日志回滚。

TransactionalTemplate是核心入口类;

/*
 *  Copyright 1999-2018 Alibaba Group Holding Ltd.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package com.alibaba.fescar.tm.api;

import com.alibaba.fescar.core.exception.TransactionException;

/**
 * Template of executing business logic with a global transaction.
 */
public class TransactionalTemplate {

    /**
     * Execute object.
     *
     * @param business the business
     * @return the object
     * @throws TransactionalExecutor.ExecutionException the execution exception
     */
    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {

        // 1. get or create a transaction
        //获取全局事务管理器
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 2. begin transaction  事务begin
        try {
            tx.begin(business.timeout(), business.name());

        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }

        Object rs = null;
        try {

            // Do Your Business  执行我们具体的业务逻辑
            rs = business.execute();

        } catch (Throwable ex) {

            // 3. any business exception, rollback.  出错时回滚
            try {
                tx.rollback();

                // 3.1 Successfully rolled back
                throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);

            } catch (TransactionException txe) {
                // 3.2 Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, ex);

            }

        }

        // 4. everything is fine, commit.  事务提交
        try {
            tx.commit();

        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);

        }
        return rs;
    }

}
AI 代码解读

事务提交有两种:

public enum GlobalTransactionRole {

    /**
     * The Launcher.
     * 开启全局事务的发起者
     */
    // The one begins the current global transaction.
    Launcher,

    /**
     * The Participant.
     * 分支事务,也就是分布在各个系统中的本地事务
     */
    // The one just joins into a existing global transaction.
    Participant
}
AI 代码解读

通过代码可以看到,分支事务什么都不做,也就是直接提交本地事务。Launcher事务会进行全局事务的提交。
image

记录回滚日志的关键代码com.alibaba.fescar.rm.datasource.undo.UndoLogManager#flushUndoLogs中的undoLogContent

public static void flushUndoLogs(ConnectionProxy cp) throws SQLException {
        assertDbSupport(cp.getDbType());

        ConnectionContext connectionContext = cp.getContext();
        String xid = connectionContext.getXid();
        long branchID = connectionContext.getBranchId();

        BranchUndoLog branchUndoLog = new BranchUndoLog();
        branchUndoLog.setXid(xid);
        branchUndoLog.setBranchId(branchID);
        branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

        String undoLogContent = UndoLogParserFactory.getInstance().encode(branchUndoLog);

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Flushing UNDO LOG: " + undoLogContent);
        }

        PreparedStatement pst = null;
        try {
            pst = cp.getTargetConnection().prepareStatement(INSERT_UNDO_LOG_SQL);
            pst.setLong(1, branchID);
            pst.setString(2, xid);
            pst.setBlob(3, BlobUtils.string2blob(undoLogContent));
            pst.executeUpdate();
        } catch (Exception e) {
            if (e instanceof SQLException) {
                throw (SQLException) e;
            } else {
                throw new SQLException(e);
            }
        } finally {
            if (pst != null) {
                pst.close();
            }
        }

    }
AI 代码解读

这里根据上面的实例,查看其中一条日志:

{
    "branchId": 1890459,
    "sqlUndoLogs": [{
        "afterImage": {
            "rows": [{
                "fields": [{
                    "keyType": "PrimaryKey",
                    "name": "ID",
                    "type": 4,
                    "value": 8
                }, {
                    "keyType": "NULL",
                    "name": "MONEY",
                    "type": 4,
                    "value": 199
                }]
            }],
            "tableName": "account_tbl"
        },
        "beforeImage": {
            "rows": [{
                "fields": [{
                    "keyType": "PrimaryKey",
                    "name": "ID",
                    "type": 4,
                    "value": 8
                }, {
                    "keyType": "NULL",
                    "name": "MONEY",
                    "type": 4,
                    "value": 599
                }]
            }],
            "tableName": "account_tbl"
        },
        "sqlType": "UPDATE",
        "tableName": "account_tbl"
    }],
    "xid": "10.240.130.105:8091:1890457"
}
AI 代码解读

可以看到,日志中记录了修改之前和之后的数据变化情况,也就是数据镜像,回滚时就是根据这个进行回滚的。

由于现在fescar才刚刚开源,远没有达到商用,需要到1.0版本才可以线上使用。本文只是简单了解入门一下,后续在升级几个版本之后再详细的分析其源码。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
分布式锁—1.原理算法和使用建议
本文主要探讨了Redis分布式锁的八大问题,包括非原子操作、忘记释放锁、释放其他线程的锁、加锁失败处理、锁重入问题、锁竞争问题、锁超时失效及主从复制问题,并提供了相应的优化措施。接着分析了Redis的RedLock算法,讨论其优缺点以及分布式专家Martin对其的质疑。此外,文章对比了基于Redis和Zookeeper(zk)的分布式锁实现原理,包括获取与释放锁的具体流程。最后总结了两种分布式锁的适用场景及使用建议,指出Redis分布式锁虽有性能优势但模型不够健壮,而zk分布式锁更稳定但部署成本较高。实际应用中需根据业务需求权衡选择。
|
5月前
|
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
192 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
阿里云PAI发布DeepRec Extension,打造稳定高效的分布式训练,并宣布开源!
分布式 RPC 底层原理详解,看这篇就够了!
本文详解分布式RPC的底层原理与系统设计,大厂面试高频,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
分布式 RPC 底层原理详解,看这篇就够了!
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
分布式机器学习系统:设计原理、优化策略与实践经验
本文详细探讨了分布式机器学习系统的发展现状与挑战,重点分析了数据并行、模型并行等核心训练范式,以及参数服务器、优化器等关键组件的设计与实现。文章还深入讨论了混合精度训练、梯度累积、ZeRO优化器等高级特性,旨在提供一套全面的技术解决方案,以应对超大规模模型训练中的计算、存储及通信挑战。
362 4
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
378 0
分布式爬虫框架Scrapy-Redis实战指南
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
210 67
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
191 7
【📕分布式锁通关指南 02】基于Redis实现的分布式锁
本文介绍了从单机锁到分布式锁的演变,重点探讨了使用Redis实现分布式锁的方法。分布式锁用于控制分布式系统中多个实例对共享资源的同步访问,需满足互斥性、可重入性、锁超时防死锁和锁释放正确防误删等特性。文章通过具体示例展示了如何利用Redis的`setnx`命令实现加锁,并分析了简化版分布式锁存在的问题,如锁超时和误删。为了解决这些问题,文中提出了设置锁过期时间和在解锁前验证持有锁的线程身份的优化方案。最后指出,尽管当前设计已解决部分问题,但仍存在进一步优化的空间,将在后续章节继续探讨。
754 131
【📕分布式锁通关指南 02】基于Redis实现的分布式锁

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问