基于事件流的轻量级异常容错设计—支持接口可重入

简介: ## 写在前面 在我们平时的业务代码中,最常见的代码结构就是外部的请求打过来,首先进行必要的参数校验,接着根据参数对关联实体的状态进行校验,然后再校验业务逻辑,最后推进关联实体的状态。下面以一段代码简单示例一下 ```java pulic class ReentryServiceImpl implements ReentryService { publi

写在前面

在我们平时的业务代码中,最常见的代码结构就是外部的请求打过来,首先进行必要的参数校验,接着根据参数对关联实体的状态进行校验,然后再校验业务逻辑,最后推进关联实体的状态。下面以一段代码简单示例一下

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(String entityName) {
       Entity1 entity1 = queryEntityByName(entityName);
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       entity1Service.finishEntity1(entity);
       entity2Service.createEntity2(entity.getEntity2());
       entity3Service.rollBackEntity3(entity.getEntity3());
    }
    
}

在这种代码当中,如果后面三行状态推进的服务调用失败,如果缺乏适当的异常处理机制的话,就会导致整个服务不可用,在bizHandle(String)方法重试的时候,可能由于entity1Service.finishEntity1(entity);调用成功导致方法入口处的状态校验就失败了,接着方法直接返回(或者抛出异常),最终造成关联实体的状态不一致。

当然,对于这种情况,由于方法的入口处的校验是针对entity1来做的,那么在后续状态推进的过程中,调整状态推进的顺序,把entity1的状态推进放在最后来做,然后基于服务调用的幂等性,其实也可以保证重试时的可重入。但是这种做法过度依赖业务场景,不具备通用的场景处理能力哦。如果此处的校验并不是针对entity1来做的,而是对entity1,entity2,entity3来做的联合的状态校验,那么不管如何调整后续状态推进的顺序,都是无法保证重试的可重入的。

因此,凌驾于具体场景之上,我希望做到的是,编写业务代码的时候,可以尽量降低开发人员的负担,使用一种统一的处理机制来保证接口重试时的可重入能力。

服务与关联事件

通过上面的概述,相信大家已经知道我要做一件什么样的事了——异常情况下支持接口重试时的可重入机制。用白话说就是在接口第一次调用失败的情况下,后续进行重试,为了避免重试时状态校验失败而导致重试失败,我需要知道上一次服务调用的时候,方法执行到了哪一步,然后,可以直接从这一步开始继续往下执行(为了保证业务数据正确,所有的外部服务接口都需要支持幂等)。

为了达到这一目的,自然想到的就是使用一个列表或者队列把所有的服务调用组织起来,比如上述例子中的:

entity1Service.finishEntity1(entity);
entity2Service.createEntity2(entity.getEntity2());
entity3Service.rollBackEntity3(entity.getEntity3());

并且,必须保证这些服务调用的先后顺序是确定的(任意一次重试的时候,看到的这三个服务调用顺序都是一致的)。首先我把这些服务调用都当做是ReentryServiceImpl.bizHandle(String)方法执行过程中的事件,在做bizHandle的时候,需要依次去执行entity1Service.finishEntity1(entity)、entity2Service.createEntity2(entity.getEntity2())和entity3Service.rollBackEntity3(entity.getEntity3())。因此,首先定义一个通用事件接口:

public interface Event {

    /**
     * 执行本次事件的任务
     *
     * @throws Exception 异常
     */
    void handle(Context context) throws Exception;

    /**
     * 倒序执行本次事件,当前事件的所有后续事件都执行完了,执行该方法
     * @throws Exception 异常
     */
    void postHandle(Context context) throws Exception;
}

有了这个接口之后,就可以对上述服务中的三个事件进行定义了。依次为:

@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 1)
public class FinishEntity1Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity1Service.finishEntity1(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}
@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 2)
public class CreateEntity2Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity2Service.createEntity2(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}
@RelatedService(serviceName = "ReentryService", methodName = "bizHandle", order = 3)
public class RollBackEntity3Event implements Event {

    @Override
    public void handle(Context context) throws Exception {
        entity3Service.rollBackEntity3(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        //log.info("Context success, context={}",context);
    }
}

好了,到此三个事件定义好了,估计大家看到在事件上打了个注解@RelatedService,先给出代码:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Component
public @interface RelatedService {

    /**
     * 事件关联的服务 {@link Event}
     *
     * @return 一般为服务的bean name
     */
    String serviceName() default "";

    /**
     *
     * @return 调用事件的服务方法名
     */
    String methodName() default "";

    /**
     * 事件在服务中的执行顺序, 值越小,执行顺序越靠前
     * @return 优先级
     */
    int order() default 0;

}

在这个注解上总共有三个参数,分别为:

  • order:可以定义事件执行先后顺序,值越小,执行顺序越靠前
  • serviceName:事件执行时所在服务名,一般为bean的名字,在上面这个例子中就是:ReentryService
  • methodName:执行事件的方法名,用户自定义的,在上面的例子中可以是:bizHandle

并且这个注解继承了@Component,因此上面的三个事件都会被注入到Spring IoC容器中进行管理。

好了,有了上面这一系列铺垫,通过spring IoC容器我们就可以非常方便的指导当前正在调用的ReentryService服务的bizHandle方法中需要执行的事件是什么,并且这些事件之间执行的先后顺序。

事件容器和事件队列

有了上面的内容,我就可以非常方便的在服务执行的过程中知道我当前正在执行的服务将要执行哪些事件,这些事件执行的先后顺序是怎么样的。那么,接下来我需要做的就是把这些事件编排起来,使用一个统一的容器来管理这些事件。先给出事件容器的代码(一般是一个服务的一个方法对应一个事件容器)

@Slf4j(topic = "APPLICATION")
public class EventContainer implements Event {

    private String serviceName;

    private String methodName;

    @Getter
    private final Queue<EventNode> eventNodes = new PriorityQueue<>();

    public EventContainer(String serviceName, String methodName) {
        this.serviceName = serviceName;
        this.methodName = methodName;
    }

    public void init() {

        Map<String, Object> eventBeans = Env.getBeansWithAnnotation(RelatedService.class);

        log.info("EventPipeline.init success, eventBeans={}",eventBeans);

        for (Map.Entry<String, Object> beanEntry : eventBeans.entrySet()) {
            String eventName = beanEntry.getKey();
            Object beanObj = beanEntry.getValue();
            if (!(beanObj instanceof Event)) {
                continue;
            }
            RelatedService relatedService = beanObj.getClass().getAnnotation(RelatedService.class);
            String serviceName = relatedService.serviceName();
            String methodName = relatedService.methodName();
            int order = relatedService.order();
            if (!serviceName.equals(this.serviceName)) {
                continue;
            }
            if (!methodName.equals(this.methodName)) {
                continue;
            }
            EventNode curEventNode = new EventNode(order,  (Event)beanObj);
            eventNodes.offer(curEventNode);
        }

        log.info("EventPipeline.init success, {}",eventNodes);
    }

    @Override
    public void handle(Context context) throws Exception {
        init();
        while (eventNodes.peek() != null) {
            EventNode curNode = eventNodes.poll();
            curNode.handle(context);
            log.info("EventContainer.handle, {},{}",curNode.getOrder(), curNode.getCurEvent());
        }
    }

    @Override
    public void postHandle(StockDiffContext context) throws Exception {

    }
}

EventContainer也是Event的实现类,其handle(Context)方法是整个事件流执行的入口。

在EventContainer中有三个非常重要的内容:

  • serviceName,事件容器对应的服务名
  • methodName,事件容器对应的方法名
  • Queue eventNodes,事件节点的优先级队列(事件队列),事件会在这个队列中被依次执行

而事件队列的初始化就是通过前面的@RelatedService注解实现的,具体见代码。

EventNode的定义代码:

public class EventNode implements Comparable<EventNode>, Event {

    @Getter
    private String nodeName;

    @Getter
    private int order;

    @Getter
    private Event curEvent;

    EventNode(){}

    public EventNode(String name, int order) {
        this.nodeName = name;
        this.order = order;
    }

    public EventNode(String name, int order, Event curEvent) {
        this(name,order);
        this.curEvent = curEvent;
    }

    public EventNode(int order, Event curEvent) {
        this.curEvent = curEvent;
        this.order = order;
    }


    @Override
    public int compareTo(EventNode o) {
        return this.order - o.order;
    }

    @Override
    public void handle(Context context) throws Exception {
        this.curEvent.handle(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        this.curEvent.postHandle(context);
    }
}

有了上面的内容,我们就构建了一整套基于事件流的服务调用机制了。总结一下:

  • Event,事件的接口,规定了事件执行的动作
  • EventNode,事件节点,方便对事件进行编排
  • EventContainer,事件容器,管理服务所对应的事件,并负责执行事件
  • @RelatedService,事件关联服务的注解

基于事件流的机制,本文开始的例子,代码可以修改为:

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(Context context) {
       Entity1 entity1 = queryEntityByName(context.getEntityName());
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
       eventContainer.handle(context);
    }
    
}

支持可重入

上面所述的部分仅仅是实现了基本的事件流的处理机制,但是还不具备接口的可重入能力,要想实现可重入,就必须在服务调用的时候,对调用的进度进行持久化,这样方便下次调用的时候,对服务进行恢复。

想实现这一点,有很多方法,最简单的就是使用基于幂等+标志位的方式,持久化在db中,谢谢我的师兄提供了这个非常简单实用的方案。

代码如下:

pulic class ReentryServiceImpl implements ReentryService {
        
    public void bizHandle(Context context) {
       // 计算幂等键,保证唯一
       String idempotentKey = buildIdempotentKey(serviceName,methodName,context.getEntityName());
       IdempotentDO idem = queryIdempotent(idempotentKey);
       if (idem != null) {
            context.setExecuteProcess(idem.getExecuteProcess());
            EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
            eventContainer.handle(context);
            return;
       }
       Entity1 entity1 = queryEntityByName(entityName);
        if (entity1 == null) {
          return;
        }
        if (!entity1.getStatus.equals(Entity1StatusEnums.ACCEPT)) {
          return;
       }
       EventContainer eventContainer = new EventContainer("ReentryService", "bizHandle");
       eventContainer.handle(context);
    }
    
}

相应的,每个事件节点在执行的时候,都要重写postHandle(Context)方法,如下:

public class EventNode implements Comparable<EventNode>, Event {

    @Getter
    private String nodeName;

    @Getter
    private int order;

    @Getter
    private Event curEvent;
    
    //省略其他

    @Override
    public void handle(Context context) throws Exception {
        this.curEvent.handle(context);
    }

    @Override
    public void postHandle(Context context) throws Exception {
        this.curEvent.postHandle(context);
        //更新执行进度,如果没有这条幂等记录,就插入
        updateExecuteProcess(context.getIdempotentKey(),this.order);
    }
}

还差最后一步改造就大功告成了,重写EventContainer的handle(Context)方法

    @Override
    public void handle(Context context) throws Exception {
        init();
        int process = context.getExecuteProcess();
        int idx = 0;
        while (eventNodes.peek() != null) {
            EventNode curNode = eventNodes.poll();
            if (++idx <= process){
                continue;
            }
            curNode.handle(context);
            log.info("EventContainer.handle, {},{}",curNode.getOrder(), curNode.getCurEvent());
        }
    }

写在最后

这点总结记录了自己日常开发中针对痛点的一些想法,可能并不是很成熟,要是和大家有共鸣的话,那真是太好了,欢迎批评指教~

相关文章
|
3月前
|
消息中间件 缓存 监控
在FaaS中,如何设计无状态的函数来确保数据处理的一致性?
在FaaS中,如何设计无状态的函数来确保数据处理的一致性?
|
3月前
|
消息中间件 存储 分布式计算
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
53 4
|
5月前
|
存储 缓存 自然语言处理
Lettuce的特性和内部实现问题之分布式环境中消息发送时的问题如何解决
Lettuce的特性和内部实现问题之分布式环境中消息发送时的问题如何解决
|
5月前
|
NoSQL Redis
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
|
6月前
|
Java
通用快照方案问题之调整Hystrix的信号量隔离模式的并发限制如何解决
通用快照方案问题之调整Hystrix的信号量隔离模式的并发限制如何解决
48 0
|
6月前
|
设计模式 存储 缓存
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
Java面试题:结合建造者模式与内存优化,设计一个可扩展的高性能对象创建框架?利用多线程工具类与并发框架,实现一个高并发的分布式任务调度系统?设计一个高性能的实时事件通知系统
72 0
|
6月前
|
设计模式 安全 Java
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
Java面试题:如何实现一个线程安全的单例模式,并确保其在高并发环境下的内存管理效率?如何使用CyclicBarrier来实现一个多阶段的数据处理任务,确保所有阶段的数据一致性?
84 0
|
8月前
|
缓存 NoSQL Java
【亮剑】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护,如何使用注解来实现 Redis 分布式锁的功能?
【4月更文挑战第30天】分布式锁是保证多服务实例同步的关键机制,常用于互斥访问共享资源、控制访问顺序和系统保护。基于 Redis 的分布式锁利用 SETNX 或 SET 命令实现,并考虑自动过期、可重入及原子性以确保可靠性。在 Java Spring Boot 中,可通过 `@EnableCaching`、`@Cacheable` 和 `@CacheEvict` 注解轻松实现 Redis 分布式锁功能。
144 0
|
前端开发 Java 微服务
微服务之间调用的异常应该如何处理
在分布式服务的场景下,业务服务都将进行拆分,不同服务之间都会相互调用,如何做好异常处理是比较关键的,可以让业务人员在页面使用系统报错后,很清楚的看到服务报错的原因,而不是返回代码级别的异常报错,比如NullException、IllegalArgumentException、FeignExecption等异常报错,这样就会让非技术人员看到了一头雾水,从而很降低用户的体验感。
|
存储 Java 数据处理
响应式流的核心机制——背压机制
响应式流的核心机制——背压机制
227 0