基于事件流的轻量级异常容错设计—支持接口可重入

简介: ## 写在前面 在我们平时的业务代码中,最常见的代码结构就是外部的请求打过来,首先进行必要的参数校验,接着根据参数对关联实体的状态进行校验,然后再校验业务逻辑,最后推进关联实体的状态。下面以一段代码简单示例一下 ```java pulic class ReentryServiceImpl implements ReentryService { publi

写在前面

在我们平时的业务代码中,最常见的代码结构就是外部的请求打过来,首先进行必要的参数校验,接着根据参数对关联实体的状态进行校验,然后再校验业务逻辑,最后推进关联实体的状态。下面以一段代码简单示例一下

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(String entityName) {
       Entity1 entity1 = queryEntityByName(entityName);
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       entity1Service.finishEntity1(entity);
       entity2Service.createEntity2(entity.getEntity2());
       entity3Service.rollBackEntity3(entity.getEntity3());
    }
    
}

在这种代码当中,如果后面三行状态推进的服务调用失败,如果缺乏适当的异常处理机制的话,就会导致整个服务不可用,在bizHandle(String)方法重试的时候,可能由于entity1Service.finishEntity1(entity);调用成功导致方法入口处的状态校验就失败了,接着方法直接返回(或者抛出异常),最终造成关联实体的状态不一致。

当然,对于这种情况,由于方法的入口处的校验是针对entity1来做的,那么在后续状态推进的过程中,调整状态推进的顺序,把entity1的状态推进放在最后来做,然后基于服务调用的幂等性,其实也可以保证重试时的可重入。但是这种做法过度依赖业务场景,不具备通用的场景处理能力哦。如果此处的校验并不是针对entity1来做的,而是对entity1,entity2,entity3来做的联合的状态校验,那么不管如何调整后续状态推进的顺序,都是无法保证重试的可重入的。

因此,凌驾于具体场景之上,我希望做到的是,编写业务代码的时候,可以尽量降低开发人员的负担,使用一种统一的处理机制来保证接口重试时的可重入能力。

服务与关联事件

通过上面的概述,相信大家已经知道我要做一件什么样的事了——异常情况下支持接口重试时的可重入机制。用白话说就是在接口第一次调用失败的情况下,后续进行重试,为了避免重试时状态校验失败而导致重试失败,我需要知道上一次服务调用的时候,方法执行到了哪一步,然后,可以直接从这一步开始继续往下执行(为了保证业务数据正确,所有的外部服务接口都需要支持幂等)。

为了达到这一目的,自然想到的就是使用一个列表或者队列把所有的服务调用组织起来,比如上述例子中的:

entity1Service.finishEntity1(entity);
entity2Service.createEntity2(entity.getEntity2());
entity3Service.rollBackEntity3(entity.getEntity3());

并且,必须保证这些服务调用的先后顺序是确定的(任意一次重试的时候,看到的这三个服务调用顺序都是一致的)。首先我把这些服务调用都当做是ReentryServiceImpl.bizHandle(String)方法执行过程中的事件,在做bizHandle的时候,需要依次去执行entity1Service.finishEntity1(entity)、entity2Service.createEntity2(entity.getEntity2())和entity3Service.rollBackEntity3(entity.getEntity3())。因此,首先定义一个通用事件接口:

public interface Event {

    /**
     * 执行本次事件的任务
     *
     * @throws Exception 异常
     */
    void handle(Context context) throws Exception;

    /**
     * 倒序执行本次事件,当前事件的所有后续事件都执行完了,执行该方法
     * @throws Exception 异常
     */
    void postHandle(Context context) throws Exception;
}

有了这个接口之后,就可以对上述服务中的三个事件进行定义了。依次为:

@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 1)
public class FinishEntity1Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity1Service.finishEntity1(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}
@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 2)
public class CreateEntity2Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity2Service.createEntity2(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}
@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 3)
public class RollBackEntity3Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity3Service.rollBackEntity3(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}

好了,到此三个事件定义好了,估计大家看到在事件上打了个注解@RelatedService,先给出代码:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Component
public @interface RelatedService {

    /**
     * 事件关联的服务 {@link Event}
     *
     * @return 一般为服务的bean name
     */
    String serviceName() default "";

    /**
     *
     * @return 调用事件的服务方法名
     */
    String methodName() default "";

    /**
     * 事件在服务中的执行顺序, 值越小,执行顺序越靠前
     * @return 优先级
     */
    int order() default 0;

}

在这个注解上总共有三个参数,分别为:

  • order:可以定义事件执行先后顺序,值越小,执行顺序越靠前
  • serviceName:事件执行时所在服务名,一般为bean的名字,在上面这个例子中就是:ReentryService
  • methodName:执行事件的方法名,用户自定义的,在上面的例子中可以是:bizHandle

并且这个注解继承了@Component,因此上面的三个事件都会被注入到Spring IoC容器中进行管理。

好了,有了上面这一系列铺垫,通过spring IoC容器我们就可以非常方便的指导当前正在调用的ReentryService服务的bizHandle方法中需要执行的事件是什么,并且这些事件之间执行的先后顺序。

事件容器和事件队列

有了上面的内容,我就可以非常方便的在服务执行的过程中知道我当前正在执行的服务将要执行哪些事件,这些事件执行的先后顺序是怎么样的。那么,接下来我需要做的就是把这些事件编排起来,使用一个统一的容器来管理这些事件。先给出事件容器的代码(一般是一个服务的一个方法对应一个事件容器)

@Slf4j(topic = "APPLICATION")
public class EventContainer implements Event {

    private String serviceName;

    private String methodName;

    @Getter
    private final Queue<EventNode> eventNodes = new PriorityQueue<>();

    public EventContainer(String serviceName, String methodName) {
        this.serviceName = serviceName;
        this.methodName = methodName;
    }

    public void init() {

        Map<String, Object> eventBeans = Env.getBeansWithAnnotation(RelatedService.class);

        log.info("EventPipeline.init success, eventBeans={}",eventBeans);

        for (Map.Entry<String, Object> beanEntry : eventBeans.entrySet()) {
            String eventName = beanEntry.getKey();
            Object beanObj = beanEntry.getValue();
            if (!(beanObj instanceof Event)) {
                continue;
            }
            RelatedService relatedService = beanObj.getClass().getAnnotation(RelatedService.class);
            String serviceName = relatedService.serviceName();
            String methodName = relatedService.methodName();
            int order = relatedService.order();
            if (!serviceName.equals(this.serviceName)) {
                continue;
            }
            if (!methodName.equals(this.methodName)) {
                continue;
            }
            EventNode curEventNode = new EventNode(order,  (Event)beanObj);
            eventNodes.offer(curEventNode);
        }

        log.info("EventPipeline.init success, {}",eventNodes);
    }

    @Override
    public void handle(Context context) throws Exception {
        init();
        while (eventNodes.peek() != null) {
            EventNode curNode = eventNodes.poll();
            curNode.handle(context);
            log.info("EventContainer.handle, {},{}",curNode.getOrder(), curNode.getCurEvent());
        }
    }

    @Override
    public void postHandle(StockDiffContext context) throws Exception {

    }
}

EventContainer也是Event的实现类,其handle(Context)方法是整个事件流执行的入口。

在EventContainer中有三个非常重要的内容:

  • serviceName,事件容器对应的服务名
  • methodName,事件容器对应的方法名
  • Queue eventNodes,事件节点的优先级队列(事件队列),事件会在这个队列中被依次执行

而事件队列的初始化就是通过前面的@RelatedService注解实现的,具体见代码。

EventNode的定义代码:

public class EventNode implements Comparable<EventNode>, Event {

    @Getter
    private String nodeName;

    @Getter
    private int order;

    @Getter
    private Event curEvent;

    EventNode(){}

    public EventNode(String name, int order) {
        this.nodeName = name;
        this.order = order;
    }

    public EventNode(String name, int order, Event curEvent) {
        this(name,order);
        this.curEvent = curEvent;
    }

    public EventNode(int order, Event curEvent) {
        this.curEvent = curEvent;
        this.order = order;
    }


    @Override
    public int compareTo(EventNode o) {
        return this.order - o.order;
    }

    @Override
    public void handle(Context context) throws Exception {
        this.curEvent.handle(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        this.curEvent.postHandle(context);
    }
}

有了上面的内容,我们就构建了一整套基于事件流的服务调用机制了。总结一下:

  • Event,事件的接口,规定了事件执行的动作
  • EventNode,事件节点,方便对事件进行编排
  • EventContainer,事件容器,管理服务所对应的事件,并负责执行事件
  • @RelatedService,事件关联服务的注解

基于事件流的机制,本文开始的例子,代码可以修改为:

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(Context context) {
       Entity1 entity1 = queryEntityByName(context.getEntityName());
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
       eventContainer.handle(context);
    }
    
}

支持可重入

上面所述的部分仅仅是实现了基本的事件流的处理机制,但是还不具备接口的可重入能力,要想实现可重入,就必须在服务调用的时候,对调用的进度进行持久化,这样方便下次调用的时候,对服务进行恢复。

想实现这一点,有很多方法,最简单的就是使用基于幂等+标志位的方式,持久化在db中,谢谢我的师兄提供了这个非常简单实用的方案。

代码如下:

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(Context context) {
       // 计算幂等键,保证唯一
       String idempotentKey = buildIdempotentKey(serviceName,methodName,context.getEntityName());
       IdempotentDO idem = queryIdempotent(idempotentKey);
       if (idem != null) {
            context.setExecuteProcess(idem.getExecuteProcess());
            EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
            eventContainer.handle(context);
            return;
       }
       Entity1 entity1 = queryEntityByName(entityName);
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
       eventContainer.handle(context);
    }
    
}

相应的,每个事件节点在执行的时候,都要重写postHandle(Context)方法,如下:

public class EventNode implements Comparable<EventNode>, Event {

    @Getter
    private String nodeName;

    @Getter
    private int order;

    @Getter
    private Event curEvent;
    
    //省略其他

    @Override
    public void handle(Context context) throws Exception {
        this.curEvent.handle(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        this.curEvent.postHandle(context);
        //更新执行进度,如果没有这条幂等记录,就插入
        updateExecuteProcess(context.getIdempotentKey(),this.order);
    }
}

还差最后一步改造就大功告成了,重写EventContainer的handle(Context)方法

    @Override
    public void handle(Context context) throws Exception {
        init();
        int process = context.getExecuteProcess();
        int idx = 0;
        while (eventNodes.peek() != null) {
            EventNode curNode = eventNodes.poll();
            if (++idx <= process){
                continue;
            }
            curNode.handle(context);
            log.info("EventContainer.handle, {},{}",curNode.getOrder(), curNode.getCurEvent());
        }
    }

写在最后

这点总结记录了自己日常开发中针对痛点的一些想法,可能并不是很成熟,要是和大家有共鸣的话,那真是太好了,欢迎批评指教~

相关文章
|
2月前
|
算法 安全 编译器
并发的三大特性
并发的三大特性
50 1
|
2月前
|
安全 Go
Go语言并发新特性:单向通道的读写控制
Go语言并发新特性:单向通道的读写控制
54 0
|
9月前
|
存储 Linux 调度
确保并发执行的安全性:探索多线程和锁机制以构建可靠的程序
在当今计算机系统中,多线程编程已成为常见的需求,然而,同时也带来了并发执行的挑战。为了避免数据竞争和其他并发问题,正确使用适当的锁机制是至关重要的。通过阅读本文,读者将了解到多线程和锁机制在并发编程中的重要性,以及如何避免常见的并发问题,确保程序的安全性和可靠性。通过实际案例和代码示例来说明如何正确地使用多线程和锁机制来构建可靠的程序。
25 1
|
10天前
|
调度
【浅入浅出】Qt多线程机制解析:提升程序响应性与并发处理能力
在学习QT线程的时候我们首先要知道的是QT的主线程,也叫GUI线程,意如其名,也就是我们程序的最主要的一个线程,主要负责初始化界面并监听事件循环,并根据事件处理做出界面上的反馈。但是当我们只限于在一个主线程上书写逻辑时碰到了需要一直等待的事件该怎么办?它的加载必定会带着主界面的卡顿,这时候我们就要去使用多线程。
|
5天前
|
设计模式 存储 缓存
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
9 0
|
5天前
|
设计模式 安全 Java
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
9 0
|
5天前
|
设计模式 并行计算 安全
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
10 0
|
2月前
|
Linux 程序员 C++
【C++ 常见的异步机制】探索现代异步编程:从 ASIO 到协程的底层机制解析
【C++ 常见的异步机制】探索现代异步编程:从 ASIO 到协程的底层机制解析
421 2
|
2月前
|
Go 调度 开发者
Go语言并发基础:轻量级线程与通道通信
【2月更文挑战第6天】本文介绍了Go语言在并发编程方面的基础知识和核心概念。我们将深入探讨goroutine(轻量级线程)的创建与调度,以及如何利用channel进行goroutine间的通信与同步。此外,还将简要提及select语句的使用,并解释其在处理多个channel操作时的优势。
|
2月前
|
存储 运维 流计算
流计算中的容错机制是什么?请解释其作用和常用方法。
流计算中的容错机制是什么?请解释其作用和常用方法。
34 0