设计模式最佳套路2 —— 愉快地使用管道模式

简介: 管道模式(Pipeline Pattern) 是责任链模式(Chain of Responsibility Pattern)的常用变体之一。在管道模式中,管道扮演着流水线的角色,将数据传递到一个加工处理序列中,数据在每个步骤中被加工处理后,传递到下一个步骤进行加工处理,直到全部步骤处理完毕。PS:纯的责任链模式在链上只会有一个处理器用于处理数据,而管道模式上多个处理器都会处理数据。

本篇为设计模式第二篇,第一篇可见设计模式最佳套路 —— 愉快地使用策略模式

管道模式(Pipeline Pattern) 是责任链模式(Chain of Responsibility Pattern)的常用变体之一。在管道模式中,管道扮演着流水线的角色,将数据传递到一个加工处理序列中,数据在每个步骤中被加工处理后,传递到下一个步骤进行加工处理,直到全部步骤处理完毕。

PS:纯的责任链模式在链上只会有一个处理器用于处理数据,而管道模式上多个处理器都会处理数据。

何时使用管道模式

任务代码较为复杂,需要拆分为多个子步骤时,尤其是后续可能在任意位置添加新的子步骤、删除旧的子步骤、交换子步骤顺序,可以考虑使用管道模式。

018600e9f009944658bd3e60b223139f.gif

愉快地使用管道模式

背景回放

最开始做模型平台的时候,创建模型实例的功能,包括:“输入数据校验 -> 根据输入创建模型实例 -> 保存模型实例到相关 DB 表”总共三个步骤,也不算复杂,所以当时的代码大概是这样的:

public class ModelServiceImpl implements ModelService {

    /**
     * 提交模型(构建模型实例)
     */
    public CommonReponse<Long> buildModelInstance(InstanceBuildRequest request) {
        // 输入数据校验
        validateInput(request);
        // 根据输入创建模型实例
        ModelInstance instance = createModelInstance(request);
        // 保存实例到相关 DB 表
        saveInstance(instance);
    }
}

然而没有过多久,我们发现表单输入数据的格式并不完全符合模型的输入要求,于是我们要加入 “表单数据的预处理”。这功能还没动手呢,又有业务方提出自己也存在需要对数据进行处理的情况(比如根据商家的表单输入,生成一些其他业务数据作为模型输入)。

所以在 “输入数据校验” 之后,还需要加入 “表单输入输出预处理” 和 “业务方自定义数据处理(可选)”。这个时候我就面临一个选择:是否继续通过在 buildModelInstance 中加入新的方法来实现这些新的处理步骤?好处就是可以当下偷懒,但是坏处呢:

1、ModelService 应该只用来接收 HSF 请求,而不应该承载业务逻辑,如果将 提交模型 的逻辑都写在这个类当中,违反了 单一职责,而且后面会导致 类代码爆炸

2、将来每加入一个新的处理步骤或者删除某个步骤,我就要修改 buildModelInstance 这个本应该非常内聚的方法,违反了 开闭原则

所以,为了不给以后的自己挖坑,我觉得要思考一个万全的方案。这个时候,我小脑袋花开始飞转,突然闪过了 Netty 中的 ChannelPipeline —— 对哦,管道模式,不就正是我需要的嘛!

管道模式的实现方式也是多种多样,接下来基于前面的背景,我分享一下我目前基于 Spring 实现管道模式的 “最佳套路”(如果你有更好的套路,欢迎赐教,一起讨论哦)。

image.png

定义管道处理的上下文

/**
 * 传递到管道的上下文
 */
@Getter
@Setter
public class PipelineContext {

    /**
     * 处理开始时间
     */
    private LocalDateTime startTime;

    /**
     * 处理结束时间
     */
    private LocalDateTime endTime;

    /**
     * 获取数据名称
     */
    public String getName() {
        return this.getClass().getSimpleName();
    }
}

定义上下文处理器

/**
 * 管道中的上下文处理器
 */
public interface ContextHandler<T extends PipelineContext> {

    /**
     * 处理输入的上下文数据
     *
     * @param context 处理时的上下文数据
     * @return 返回 true 则表示由下一个 ContextHandler 继续处理,返回 false 则表示处理结束
     */
    boolean handle(T context);
}

为了方便说明,我们现在先定义出最早版 【提交模型逻辑】 的上下文和相关处理器:

/**
 * 模型实例构建的上下文
 */
@Getter
@Setter
public class InstanceBuildContext extends PipelineContext {

    /**
     * 模型 id
     */
    private Long modelId;

    /**
     * 用户 id
     */
    private long userId;

    /**
     * 表单输入
     */
    private Map<String, Object> formInput;

    /**
     * 保存模型实例完成后,记录下 id
     */
    private Long instanceId;

    /**
     * 模型创建出错时的错误信息
     */
    private String errorMsg;

    // 其他参数

    @Override
    public String getName() {
        return "模型实例构建上下文";
    }
}

处理器 - 输入数据校验:

@Component
public class InputDataPreChecker implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--输入数据校验--");

        Map<String, Object> formInput = context.getFormInput();

        if (MapUtils.isEmpty(formInput)) {
            context.setErrorMsg("表单输入数据不能为空");
            return false;
        }

        String instanceName = (String) formInput.get("instanceName");

        if (StringUtils.isBlank(instanceName)) {
            context.setErrorMsg("表单输入数据必须包含实例名称");
            return false;
        }

        return true;
    }
}

处理器 - 根据输入创建模型实例:

@Component
public class ModelInstanceCreator implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--根据输入数据创建模型实例--");

        // 假装创建模型实例

        return true;
    }
}

处理器 - 保存模型实例到相关DB表:

@Component
public class ModelInstanceSaver implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--保存模型实例到相关DB表--");

        // 假装保存模型实例

        return true;
    }
}

到这里,有个问题就出现了:应该使用什么样的方式,将同一种 Context 的 ContextHandler 串联为管道呢?思考一下:

1、给 ContextHandler 加一个 setNext 方法,每个实现类必须指定其下一个处理器。缺点也很明显,如果在当前管道中间加入一个新的 ContextHandler,那么要势必要修改前一个 ContextHandler 的 setNext 方法;另外,代码是写给人阅读的,这样做没法一眼就直观的知道整个管道的处理链路,还要进入到每个相关的 ContextHandler 中去查看才知道。

2、给 ContextHandler 加上 @Order 注解,根据 @Order 中给定的数字来确定每个 ContextHandler 的序列,一开始时每个数字间隔的可以大些(比如 10、20、30),后续加入新的 ContextHandler 时,可以指定数字为 (11、21、31)这种,那么可以避免上面方案中要修改代码的问题,但是仍然无法避免要进入每个相关的 ContextHandler 中去查看才能知道管道处理链路的问题。

3、提前写好一份路由表,指定好 ”Context -> 管道“ 的映射(管道用 List 来表示),以及管道中处理器的顺序 。Spring 来根据这份路由表,在启动时就构建好一个 Map,Map 的键为 Context 的类型,值为 管道(即 List)。这样的话,如果想知道每个管道的处理链路,直接看这份路由表就行,一目了然。缺点嘛,就是每次加入新的 ContextHandler 时,这份路由表也需要在对应管道上进行小改动 —— 但是如果能让阅读代码更清晰,我觉得这样的修改是值得的、可接受的~

bc74eee50e3e89dca430e77003b17321.gif

构建管道路由表

基于 Spring 的 Java Bean 配置,我们可以很方便的构建管道的路由表:

/**
 * 管道路由的配置
 */
@Configuration
public class PipelineRouteConfig implements ApplicationContextAware {

    /**
     * 数据类型->管道中处理器类型列表 的路由
     */
    private static final
    Map<Class<? extends PipelineContext>,
        List<Class<? extends ContextHandler<? extends PipelineContext>>>> PIPELINE_ROUTE_MAP = new HashMap<>(4);

    /*
     * 在这里配置各种上下文类型对应的处理管道:键为上下文类型,值为处理器类型的列表
     */
    static {
        PIPELINE_ROUTE_MAP.put(InstanceBuildContext.class,
                               Arrays.asList(
                                       InputDataPreChecker.class,
                                       ModelInstanceCreator.class,
                                       ModelInstanceSaver.class
                               ));

        // 将来其他 Context 的管道配置
    }

    /**
     * 在 Spring 启动时,根据路由表生成对应的管道映射关系
     */
    @Bean("pipelineRouteMap")
    public Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? extends PipelineContext>>> getHandlerPipelineMap() {
        return PIPELINE_ROUTE_MAP.entrySet()
                                 .stream()
                                 .collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
    }

    /**
     * 根据给定的管道中 ContextHandler 的类型的列表,构建管道
     */
    private List<? extends ContextHandler<? extends PipelineContext>> toPipeline(
            Map.Entry<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> entry) {
        return entry.getValue()
                    .stream()
                    .map(appContext::getBean)
                    .collect(Collectors.toList());
    }

    private ApplicationContext appContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        appContext = applicationContext;
    }
}

定义管道执行器

最后一步,定义管道执行器。管道执行器 根据传入的上下文数据的类型,找到其对应的管道,然后将上下文数据放入管道中去进行处理。

/**
 * 管道执行器
 */
@Component
public class PipelineExecutor {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 引用 PipelineRouteConfig 中的 pipelineRouteMap
     */
    @Resource
    private Map<Class<? extends PipelineContext>,
                List<? extends ContextHandler<? super PipelineContext>>> pipelineRouteMap;

    /**
     * 同步处理输入的上下文数据<br/>
     * 如果处理时上下文数据流通到最后一个处理器且最后一个处理器返回 true,则返回 true,否则返回 false
     *
     * @param context 输入的上下文数据
     * @return 处理过程中管道是否畅通,畅通返回 true,不畅通返回 false
     */
    public boolean acceptSync(PipelineContext context) {
        Objects.requireNonNull(context, "上下文数据不能为 null");
        // 拿到数据类型
        Class<? extends PipelineContext> dataType = context.getClass();
        // 获取数据处理管道
        List<? extends ContextHandler<? super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);

        if (CollectionUtils.isEmpty(pipeline)) {
            logger.error("{} 的管道为空", dataType.getSimpleName());
            return false;
        }

        // 管道是否畅通
        boolean lastSuccess = true;

        for (ContextHandler<? super PipelineContext> handler : pipeline) {
            try {
                // 当前处理器处理数据,并返回是否继续向下处理
                lastSuccess = handler.handle(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                logger.error("[{}] 处理异常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
            }

            // 不再向下处理
            if (!lastSuccess) { break; }
        }

        return lastSuccess;
    }
}

使用管道模式

此时,我们可以将最开始的 buildModelInstance 修改为:

public CommonResponse<Long> buildModelInstance(InstanceBuildRequest request) {
    InstanceBuildContext data = createPipelineData(request);
    boolean success = pipelineExecutor.acceptSync(data);

    // 创建模型实例成功
    if (success) {
        return CommonResponse.success(data.getInstanceId());
    }

    logger.error("创建模式实例失败:{}", data.getErrorMsg());
    return CommonResponse.failed(data.getErrorMsg());
}

我们模拟一下模型实例的创建过程:

参数正常时:
image.png

参数出错时:
image.png

这个时候我们再为 InstanceBuildContext 加入新的两个 ContextHandler:FormInputPreprocessor(表单输入数据预处理) 和 BizSideCustomProcessor(业务方自定义数据处理)。

@Component
public class FormInputPreprocessor implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--表单输入数据预处理--");

        // 假装进行表单输入数据预处理

        return true;
    }
}
@Component
public class BizSideCustomProcessor implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--业务方自定义数据处理--");

        // 先判断是否存在自定义数据处理,如果没有,直接返回 true

        // 调用业务方的自定义的表单数据处理

        return true;
    }
}

此时 buildModelInstance 不需要做任何修改,我们只需要在 “路由表” 里面,将这两个 ContextHandler 加入到 InstanceBuildContext 关联的管道中,Spring 启动的时候,会自动帮我们构建好每种 Context 对应的管道:

image.png

加入新的处理器

再模拟一下模型实例的创建过程:

image.png

异步处理

管道执行器 PipelineExecutor 中,acceptSync 是个同步的方法。

小蜜:看名字你就知道你悄悄埋伏笔了。

image.png

对于步骤繁多的任务,很多时候我们更需要的是异步处理,比如某些耗时长的定时任务。管道处理异步化非常的简单,我们先定义一个线程池,比如:

<!-- 专门用于执行管道任务的线程池 -->
<bean id="pipelineThreadPool"
      class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="4" /> <!-- 核心线程数 -->
    <property name="maxPoolSize" value="8" />  <!-- 最大线程数 -->
    <property name="keepAliveSeconds" value="960" />  <!-- 线程最大空闲时间/秒(根据管道使用情况指定)-->
    <property name="queueCapacity" value="256" />     <!-- 任务队列大小(根据管道使用情况指定)-->
    <property name="threadNamePrefix" value="pipelineThreadPool" />
    <property name="rejectedExecutionHandler">
        <bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy" />
    </property>
</bean>

然后在 PipelineExecutor 中加入异步处理的方法:

/**
 * 管道线程池
 */
@Resource
private ThreadPoolTaskExecutor pipelineThreadPool;

/**
 * 异步处理输入的上下文数据
 *
 * @param context  上下文数据
 * @param callback 处理完成的回调
 */
public void acceptAsync(PipelineContext context, BiConsumer<PipelineContext, Boolean> callback) {
    pipelineThreadPool.execute(() -> {
        boolean success = acceptSync(context);

        if (callback != null) {
            callback.accept(context, success);
        }
    });
}

通用处理

比如我们想记录下每次管道处理的时间,以及在处理前和处理后都打印相关的日志。那么我们可以提供两个通用的 ContextHandler,分别放在每个管道的头和尾:

@Component
public class CommonHeadHandler implements ContextHandler<PipelineContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(PipelineContext context) {
        logger.info("管道开始执行:context={}", JSON.toJSONString(context));

        // 设置开始时间
        context.setStartTime(LocalDateTime.now());

        return true;
    }
}
@Component
public class CommonTailHandler implements ContextHandler<PipelineContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(PipelineContext context) {
        // 设置处理结束时间
        context.setEndTime(LocalDateTime.now());

        logger.info("管道执行完毕:context={}", JSON.toJSONString(context));

        return true;
    }
}

通用头、尾处理器可以在路由表里面放置,但是每次新加一种 PipelineContext 都要加一次,好像没有必要 —— 我们直接修改下 管道执行器 PipelineExecutor 中的 acceptSync 方法:

@Component
public class PipelineExecutor {

    ......

    @Autowired
    private CommonHeadHandler commonHeadHandler;

    @Autowired
    private CommonTailHandler commonTailHandler;

    public boolean acceptSync(PipelineContext context) {
        ......

        // 【通用头处理器】处理
        commonHeadHandler.handle(context);

        // 管道是否畅通
        boolean lastSuccess = true;

        for (ContextHandler<? super PipelineContext> handler : pipeline) {
            try {
                // 当前处理器处理数据,并返回是否继续向下处理
                lastSuccess = handler.handle(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                logger.error("[{}] 处理异常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
            }

            // 不再向下处理
            if (!lastSuccess) { break; }
        }

        // 【通用尾处理器】处理
        commonTailHandler.handle(context);

        return lastSuccess;
    }
}

总结

通过管道模式,我们大幅降低了系统的耦合度和提升了内聚程度与扩展性:

  • ModelService 只负责处理 HSF 请求,不用关心具体的业务逻辑
  • PipelineExecutor 只做执行工作,不用关心具体的管道细节
  • 每个 ContextHandler 只负责自己那部分的业务逻辑,不需要知道管道的结构,与其他ContextHandler 的业务逻辑解耦
  • 新增、删除 或者 交换子步骤时,都只需要操作路由表的配置,而不要修改原来的调用代码

9dc17824c406b1ad78f0b55ad0a45cec.gif
关注「淘系技术」微信公众号,一个有温度有内容的技术社区~

image.png

相关文章
|
15天前
|
设计模式 安全 Java
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
|
2月前
|
设计模式 数据库连接 PHP
PHP中的设计模式:提升代码的可维护性与扩展性在软件开发过程中,设计模式是开发者们经常用到的工具之一。它们提供了经过验证的解决方案,可以帮助我们解决常见的软件设计问题。本文将介绍PHP中常用的设计模式,以及如何利用这些模式来提高代码的可维护性和扩展性。我们将从基础的设计模式入手,逐步深入到更复杂的应用场景。通过实际案例分析,读者可以更好地理解如何在PHP开发中应用这些设计模式,从而写出更加高效、灵活和易于维护的代码。
本文探讨了PHP中常用的设计模式及其在实际项目中的应用。内容涵盖设计模式的基本概念、分类和具体使用场景,重点介绍了单例模式、工厂模式和观察者模式等常见模式。通过具体的代码示例,展示了如何在PHP项目中有效利用设计模式来提升代码的可维护性和扩展性。文章还讨论了设计模式的选择原则和注意事项,帮助开发者在不同情境下做出最佳决策。
|
17天前
|
设计模式 开发者 Python
Python编程中的设计模式:工厂方法模式###
本文深入浅出地探讨了Python编程中的一种重要设计模式——工厂方法模式。通过具体案例和代码示例,我们将了解工厂方法模式的定义、应用场景、实现步骤以及其优势与潜在缺点。无论你是Python新手还是有经验的开发者,都能从本文中获得关于如何在实际项目中有效应用工厂方法模式的启发。 ###
|
10天前
|
设计模式 安全 Java
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
28 1
|
1月前
|
设计模式 Java Kotlin
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
本教程详细讲解Kotlin语法,适合希望深入了解Kotlin的开发者。对于快速学习Kotlin语法,推荐查看“简洁”系列教程。本文重点介绍了构建者模式在Kotlin中的应用与改良,包括如何使用具名可选参数简化复杂对象的创建过程,以及如何在初始化代码块中对参数进行约束和校验。
21 3
|
2月前
|
设计模式 算法 安全
设计模式——模板模式
模板方法模式、钩子方法、Spring源码AbstractApplicationContext类用到的模板方法
设计模式——模板模式
|
2月前
|
设计模式 数据库连接 PHP
PHP中的设计模式:如何提高代码的可维护性与扩展性在软件开发领域,PHP 是一种广泛使用的服务器端脚本语言。随着项目规模的扩大和复杂性的增加,保持代码的可维护性和可扩展性变得越来越重要。本文将探讨 PHP 中的设计模式,并通过实例展示如何应用这些模式来提高代码质量。
设计模式是经过验证的解决软件设计问题的方法。它们不是具体的代码,而是一种编码和设计经验的总结。在PHP开发中,合理地使用设计模式可以显著提高代码的可维护性、复用性和扩展性。本文将介绍几种常见的设计模式,包括单例模式、工厂模式和观察者模式,并通过具体的例子展示如何在PHP项目中应用这些模式。
|
2月前
|
设计模式 Java Spring
spring源码设计模式分析-代理设计模式(二)
spring源码设计模式分析-代理设计模式(二)
|
1月前
|
设计模式 安全 Java
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
Kotlin教程笔记(51) - 改良设计模式 - 构建者模式
34 0
|
2月前
|
设计模式 Java
Java设计模式-工厂方法模式(4)
Java设计模式-工厂方法模式(4)

热门文章

最新文章

  • 1
    C++一分钟之-设计模式:工厂模式与抽象工厂
    42
  • 2
    《手把手教你》系列基础篇(九十四)-java+ selenium自动化测试-框架设计基础-POM设计模式实现-下篇(详解教程)
    46
  • 3
    C++一分钟之-C++中的设计模式:单例模式
    54
  • 4
    《手把手教你》系列基础篇(九十三)-java+ selenium自动化测试-框架设计基础-POM设计模式实现-上篇(详解教程)
    38
  • 5
    《手把手教你》系列基础篇(九十二)-java+ selenium自动化测试-框架设计基础-POM设计模式简介(详解教程)
    62
  • 6
    Java面试题:结合设计模式与并发工具包实现高效缓存;多线程与内存管理优化实践;并发框架与设计模式在复杂系统中的应用
    57
  • 7
    Java面试题:设计模式在并发编程中的创新应用,Java内存管理与多线程工具类的综合应用,Java并发工具包与并发框架的创新应用
    41
  • 8
    Java面试题:如何使用设计模式优化多线程环境下的资源管理?Java内存模型与并发工具类的协同工作,描述ForkJoinPool的工作机制,并解释其在并行计算中的优势。如何根据任务特性调整线程池参数
    50
  • 9
    Java面试题:请列举三种常用的设计模式,并分别给出在Java中的应用场景?请分析Java内存管理中的主要问题,并提出相应的优化策略?请简述Java多线程编程中的常见问题,并给出解决方案
    106
  • 10
    Java面试题:设计模式如单例模式、工厂模式、观察者模式等在多线程环境下线程安全问题,Java内存模型定义了线程如何与内存交互,包括原子性、可见性、有序性,并发框架提供了更高层次的并发任务处理能力
    78