COLA-statemachine事务失效踩坑

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: cola-statemachine是阿里开源项目COLA中的轻量级状态机组件。最大的特点是无状态、采用纯Java实现,用Fluent Interface(连贯接口)定义状态和事件,可用于管理状态转换场景。比如:订单状态、支付状态等简单有限状态场景。在实际使用的过程中笔者曾发现状态机内事务不生效的问题,经过排查得到解决,以此记录一下

背景

cola-statemachine是阿里开源项目COLA中的轻量级状态机组件。最大的特点是无状态、采用纯Java实现,用Fluent Interface(连贯接口)定义状态和事件,可用于管理状态转换场景。比如:订单状态、支付状态等简单有限状态场景。在实际使用的过程中我曾发现状态机内事务不生效的问题,经过排查得到解决,以此记录一下。

问题场景

一个简单的基于cola的状态机可能如下

  • 创建状态机
public StateMachine<State, Event, Context> stateMachine() {
   
    StateMachineBuilder<State, Event, Context> builder = StateMachineBuilderFactory.create();
    builder.externalTransition().from(State.TEST).to(State.DEPLOY)
            .on(Event.PASS)
            .when(passCondition())
            .perform(passAction());
    return builder.build("testMachine");
}

上述代码翻译过来是

State.TEST状态转化到State.DEPLOY状态,在Event.PASS事件下,当满足passCondition()条件时,执行passAction()内的逻辑

  • 执行状态机
/**
 * 根据当前状态、事件、上下文,进行状态流转
 *
 * @param State 当前状态
 * @param Event 当前事件
 * @param Context 当前上下文
 */
public void fire(State state, Event event, Context context) {
   
    StateMachine<State, Event, Context> stateMachine = StateMachineFactory.get("testMachine");
    stateMachine.fireEvent(state, event, context);
}

上述代码在纯Java环境可以很好的运行,一般来说,开发者会进一步结合Spring来完善多个状态机的获取

过程中通常会将状态机进行@Bean注入,将passCondition()passAction()独立出Service以期望在后续操作中更好的利用Spring的特性

简单改造后的状态机代码可能如下

@Component
public class StateMachine {
   

    @Autowired
    private ConditionService conditionService;

    @Autowired
    private ActionService actionService;

    @Bean
    public StateMachine<State, Event, Context> stateMachine() {
   
        StateMachineBuilder<State, Event, Context> builder = StateMachineBuilderFactory.create();
        builder.externalTransition().from(State.TEST).to(State.DEPLOY)
                .on(Event.PASS)
                .when(conditionService.passCondition())
                .perform(actionService.passAction());
        return builder.build("testMachine");
    }
}

假设ConditionService的实现为

当上下文不为空就满足条件,为空则不满足条件

@Service
public class ConditionServiceImpl implements ConditionService {
   

    /**
     * 通过条件
     *
     * @return Condition
     */
    @Override
    public Condition<Context> passCondition() {
   
        return context -> {
   
            if (context!=null) {
   
                return true;
            }
            return false;
        };
    }

假设ActionService的实现为

更新金额,同时更新状态,之后推送通知事件进行后续异步操作

@Service
public class ActionServiceImpl implements ActionService {
   

    @Autowired
    private PriceManager priceManager;

    @Autowired
    private StatusManager statusManager;

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    /**
     * 通过执行动作
     *
     * @return Action
     */
    @Override
    public Action<State, Event, Context> passAction() {
   
        return (from, to, event, context) -> {
   
            priceManager.updatePrice(context.getPrice());
            statusManager.updateStatus(to.getCode());
            NoticeEvent noticeEvent = context.toNoticeEvent();
            applicationEventPublisher.publishEvent(noticeEvent);
        };
    }
}

NoticeListener监听者

假设这里只是记录操作日志

@Component
public class NoticeListener {
   

    @Autowired
    private LogManager logManager;

    @Async(value = "EventExecutor")
    @EventListener(classes = NoticeEvent.class)
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    public void noticeEventAction(NoticeEvent noticeEvent) {
   
        logManager.log(noticeEvent);
    }
}

上述代码正常运行时没有问题,但这时候有的同学就会想到,想要金额和状态的更新具有一致性,不能更新了金额之后更新状态失败了。

想要保证两个操作的一致性,最简单的方式就是加上@Transactional注解,使得两个操作要么一起成功,要么一起失败

于是ActionService的代码在改动后可能是这样的

@Service
public class ActionServiceImpl implements ActionService {
   

    @Autowired
    private PriceManager priceManager;

    @Autowired
    private StatusManager statusManager;

    /**
     * 通过执行动作
     *
     * @return Action
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Action<State, Event, Context> passAction() {
   
        return (from, to, event, context) -> {
   
            priceManager.updatePrice(context.getPrice());
            statusManager.updateStatus(to.getCode());
            NoticeEvent noticeEvent = context.toNoticeEvent();
            applicationEventPublisher.publishEvent(noticeEvent);
        };
    }
}

对应的NoticeListener改为@TransactionalEventListener,以适应在上文事务提交后再执行

@Component
public class NoticeListener {
   

    @Autowired
    private LogManager logManager;

    @Async(value = "EventExecutor")
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = NoticeEvent.class)
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    public void noticeEventAction(NoticeEvent noticeEvent) {
   
        logManager.log(noticeEvent);
    }
}

修改完成后在单测中发现了2个现象

  1. 如果其中一个更新失败了,另外一个并没有回滚
  2. 如果两个都没有更新失败,NoticeListener并没有成功监听到事件

在确认ActionServiceNoticeListener无配置遗漏的地方,无典型事务失效场景,搜索半天@TransactionalEventListener监听不起作用的原因无果后,我又仔细检查了StateMachine类中whenperform的调用,也都是通过@Autowired的类进行调用的,没有产生AOP的自调用问题。代码改造后看起来很正常,按理来说不应该出现这个问题。

在百思不得其解的时候,我发现本地的日志输出稍微和平时有些不一样,在执行上述Action逻辑时,没有mybatis-plus的事务相关日志。于是想到可能@Transactional根本没有切到Action方法。

再仔细扫了眼Action逻辑可以看出写法是采用的匿名方法形式

@Override
@Transactional(rollbackFor = Exception.class)
public Action<State, Event, Context> passAction() {
   
    return (from, to, event, context) -> {
   
        priceManager.updatePrice(context.getPrice());
        statusManager.updateStatus(to.getCode());
    };
}

实际上非匿名方法写法等价于

@Override
@Transactional(rollbackFor = Exception.class)
public Action<State, Event, Context> passAction() {
   
    Action<State, Event, Context> action = new Action<>() {
   
        @Override
        public void execute(State from, State to, Event event, Context context) {
   
            priceManager.updatePrice(context.getPrice());
            statusManager.updateStatus(to.getCode());
        }
    }
    return action;
}

可以看到匿名方法实际为execute

我在状态机的使用过程中并没有直接调用该方法,所以只能是由框架内部调用的。

问题剖析

重新回到状态机开始执行的地方

public void fire(State state, Event event, Context context) {
   
    StateMachine<State, Event, Context> stateMachine = StateMachineFactory.get("testMachine");
    stateMachine.fireEvent(state, event, context);
}

跟进去fireEvent方法,可以看到第36行判断当前的状态、时间、上下文是否能够转移,如果能够进行转移则进入到第43

statemachine-1.png

之后便是校验的逻辑,当我们的action不为空的时候,便执行91行的action.execute()

statemachine-2.png

这时候我们可以看到此时的action实际上就是ActionSeriveImpl,而真正的execute实现也在ActionSeriveImpl中,于是产生了AOP自调用问题,由于无法获取到代理对象事务切面自然就不会生效了

这里的action变量则是由状态机定义时所赋值的,点击setAction方法,全局只有2个地方使用到了,一个在批量的状态流转的实现类中,一个在单个的状态流转的实现类中

statemachine-3.png

批量流转

@Override
public void perform(Action<S, E, C> action) {
   
    for(Transition transition : transitions){
   
        transition.setAction(action);
    }
}

单个流转

@Override
public void perform(Action<S, E, C> action) {
   
    transition.setAction(action);
}

代码很简单,注意函数签名都为perform,这就是状态机定义时的连贯接口

@Bean
public StateMachine<State, Event, Context> stateMachine() {
   
    StateMachineBuilder<State, Event, Context> builder = StateMachineBuilderFactory.create();
    builder.externalTransition().from(State.TEST).to(State.DEPLOY)
        .on(Event.PASS)
        .when(conditionService.passCondition())
        .perform(actionService.passAction());
    return builder.build("testMachine");
}

在这里actionService.passAction()看上去是一次service调用,实际上并没有实际调用execute方法

passAction的接口定义为Action<State, Event, Context>,这里仅仅是将定义好的action函数通过perform接口赋值到状态机内部而已。真正的执行,需要在fireEvent之后。

解决方法

在了解了问题所在之后,便是想办法进行解决。

通常来说一个AOP自调用的解决方法可以为如下2点

  1. 在自调用类中注入自己(仅限低版本Springboot,在高版本中会有循环依赖检测)
  2. 采用AopContext.currentProxy()获取当前类的代理对象,用代理对象进行自身方法的调用

很可惜,两种方法在当前场景都不适用,因为自调用在COLA框架内部,如果为了解决这个问题去再包装框架就有点大动干戈了。

方法一

既然没有声明式事务,直接采用编程式事务就好了

改进后的Action代码如下

@Service
public class ActionServiceImpl implements ActionService {
   

    @Autowired
    private PriceManager priceManager;

    @Autowired
    private StatusManager statusManager;

    @Autowired
    private DataSourceTransactionManager dataSourceManager;

    /**
     * 通过执行动作
     *
     * @return Action
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Action<State, Event, Context> passAction() {
   
        return (from, to, event, context) -> {
   
            TransactionStatus begin = dataSourceManager.getTransaction(new DefaultTransactionAttribute());
            try {
   
                priceManager.updatePrice(context.getPrice());
                statusManager.updateStatus(to.getCode());
                NoticeEvent noticeEvent = context.toNoticeEvent();
                applicationEventPublisher.publishEvent(noticeEvent);
                dataSourceManager.commit(begin);
            } catch (Exception e) {
   
                dataSourceManager.rollback(begin);
            }
        };
    }
}

需要注意的是,applicationEventPublisher.publishEvent(noticeEvent);需要放在dataSourceManager.commit(begin);前,这样@TransactionalEventListener才能正确监听到,如果放在commit之后,上文事务会做完提交和释放SqlSession的动作,后续的监听者无法监听一个已释放的事务。

对应的控制台日志为

Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@295854a]
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@295854a]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@295854a]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@295854a]

方法二

回顾上面的状态机定义,我假定的是你是这样实现的状态机

@Component
public class StateMachine {
   

    @Autowired
    private ConditionService conditionService;

    @Autowired
    private ActionService actionService;

    @Bean
    public StateMachine<State, Event, Context> stateMachine() {
   
        StateMachineBuilder<State, Event, Context> builder = StateMachineBuilderFactory.create();
        builder.externalTransition().from(State.TEST).to(State.DEPLOY)
                .on(Event.PASS)
                .when(conditionService.passCondition())
                .perform(actionService.passAction());
        return builder.build("testMachine");
    }
}

其中ConditionService和ActionService定义了Contion和Action接口的返回,然后在内部实现了匿名类

public interface ConditionService {

    Condition<AuditContext> passOrRejectCondition();

    Condition<AuditContext> doneCondition();
}
public interface ActionService {
   

    Action<AuditState, AuditEvent, AuditContext> passOrRejectAction();

    Action<AuditState, AuditEvent, AuditContext> doneAction();
}

但其实正确的做法是直接实现Condition或Action的接口,将实现类定义为Bean,传递这个Bean到状态机定义中,从根本上解决事务失效问题

上述代码应该转化为

@Component
public class StateMachine {
   

    @Resource
    @Qualifier("conditionImpl")
    private Condition<Context> conditionImpl;

    @Resource
    @Qualifier("actionImpl")
    private Action<State, Event, Context> actionImpl;

    @Bean
    public StateMachine<State, Event, Context> stateMachine() {
   
        StateMachineBuilder<State, Event, Context> builder = StateMachineBuilderFactory.create();
        builder.externalTransition().from(State.TEST).to(State.DEPLOY)
                .on(Event.PASS)
                .when(conditionImpl)
                .perform(actionImpl);
        return builder.build("testMachine");
    }
}

对应的接口实现

@Component
public class ConditionImpl implements Condition<Context> {
   

    @Override
    public boolean isSatisfied(Context context) {
   
        return false;
    }
}
@Component
public class ActionImpl implements Action<State, Event, Context> {
   

    @Override
    @Transactional
    public void execute(State from, State to, Event event, Context context) {
   

    }
}

由于传递的直接是Bean,所以就不再存在匿名类自调用的问题,在Action或Condition的实现方法executeisSatisfied上增加@Transactional即可让事务生效

总结

有的时候Spring代码写多了,看起来代码和平时没区别,实际上在特殊场景还是会踩坑,当事务和其他框架结合时一定要注意潜在的事务问题,做好单元测试。

另外,状态机具有天生幂等的特点,不仅仅可以用于这种场景重Condition或Action的场景,在DDD中它可以作为维护某个状态的方法,用于充血模型

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
Java Spring
SpringBoot中事务执行原理分析(三)
SpringBoot中事务执行原理分析(三)
133 0
|
6月前
|
数据库
MyBatisPlus-乐观锁概念及实现步骤
MyBatisPlus-乐观锁概念及实现步骤
123 0
|
11月前
|
XML 设计模式 Java
SpringBoot中事务执行原理分析(一)
SpringBoot中事务执行原理分析(一)
105 1
|
6月前
|
SQL Java 数据库连接
SpringBoot中事务执行原理分析(六)
SpringBoot中事务执行原理分析(六)
169 0
|
6月前
|
Java 关系型数据库 MySQL
SpringBoot中事务执行原理分析(五)
SpringBoot中事务执行原理分析(五)
60 0
|
Java Spring
SpringBoot中事务执行原理分析补充篇
SpringBoot中事务执行原理分析补充篇
65 0
|
Java Spring
SpringBoot中事务执行原理分析(二)
SpringBoot中事务执行原理分析(二)
58 0
|
Java 数据库连接 数据库
SpringBoot中事务执行原理分析(四)
SpringBoot中事务执行原理分析(四)
91 0
|
Java Spring 容器
Spring 事务失效的常见八大场景,注意避坑
Spring 事务失效的常见八大场景,注意避坑
335 1
|
XML Java 数据库
Spring 事务传播机制、隔离级别以及事务执行流程源码结合案例分析(下)
Spring 事务传播机制、隔离级别以及事务执行流程源码结合案例分析(下)
130 0
Spring 事务传播机制、隔离级别以及事务执行流程源码结合案例分析(下)