简化ETL工作,编写一个Canal胶水层(上)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云解析 DNS,旗舰版 1个月
简介: 这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。

前提



这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。


之前写的几篇文章里面其中一篇曾经提到过Canal解析MySQLbinlog事件后的对象如下(来源于Canal源码com.alibaba.otter.canal.protocol.FlatMessage):


网络异常,图片无法展示
|


如果直接对此原始对象进行解析,那么会出现很多解析模板代码,一旦有改动就会牵一发动全身,这是我们不希望发生的一件事。于是花了一点点时间写了一个Canal胶水层,让接收到的FlatMessage根据表名称直接转换为对应的DTO实例,这样能在一定程度上提升开发效率并且减少模板化代码,这个胶水层的数据流示意图如下:


网络异常,图片无法展示
|


要编写这样的胶水层主要用到:

  • 反射。
  • 注解。
  • 策略模式。
  • IOC容器(可选)。


项目的模块如下:

  • canal-glue-core:核心功能。
  • spring-boot-starter-canal-glue:适配SpringIOC容器,添加自动配置。
  • canal-glue-example:使用例子和基准测试。


下文会详细分析此胶水层如何实现。


引入依赖



为了不污染引用此模块的外部服务依赖,除了JSON转换的依赖之外,其他依赖的scope定义为provide或者test类型,依赖版本和BOM如下:


<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spring.boot.version>2.3.0.RELEASE</spring.boot.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <lombok.version>1.18.12</lombok.version>
        <fastjson.version>1.2.73</fastjson.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
</dependencies>
复制代码


其中,canal-glue-core模块本质上只依赖于fastjson,可以完全脱离spring体系使用。


基本架构



这里提供一个"后知后觉"的架构图,因为之前为了快速怼到线上,初版没有考虑这么多,甚至还耦合了业务代码,组件是后来抽离出来的:


网络异常,图片无法展示
|


设计配置模块(已经移除)



设计配置模块在设计的时候考虑使用了外置配置文件和纯注解两种方式,前期使用了JSON外置配置文件的方式,纯注解是后来增加的,二选一。这一节简单介绍一下JSON外置配置文件的配置加载,纯注解留到后面处理器模块时候分析。


当初是想快速进行胶水层的开发,所以配置文件使用了可读性比较高的JSON格式:


{
  "version": 1,
  "module": "canal-glue",
  "databases": [
    {
      "database": "db_payment_service",
      "processors": [
        {
          "table": "payment_order",
          "processor": "x.y.z.PaymentOrderProcessor",
          "exceptionHandler": "x.y.z.PaymentOrderExceptionHandler"
        }
      ]
    },
    {
      ......
    }
  ]
}
复制代码


JSON配置在设计的时候尽可能不要使用JSON Array作为顶层配置,因为这样做设计的对象会比较怪


因为使用该模块的应用有可能需要处理Canal解析多个上游数据库的binlog事件,所以配置模块设计的时候需要以databaseKEY,挂载多个table以及对应的表binlog事件处理器以及异常处理器。然后对着JSON文件的格式撸一遍对应的实体类出来:


@Data
public class CanalGlueProcessorConf {
    private String table;
    private String processor;
    private String exceptionHandler;
}
@Data
public class CanalGlueDatabaseConf {
    private String database;
    private List<CanalGlueProcessorConf> processors;
}
@Data
public class CanalGlueConf {
    private Long version;
    private String module;
    private List<CanalGlueDatabaseConf> database;
}
复制代码


实体编写完,接着可以编写一个配置加载器,简单起见,配置文件直接放ClassPath之下,加载器如下:


public interface CanalGlueConfLoader {
    CanalGlueConf load(String location);
}
// 实现
public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader {
    @Override
    public CanalGlueConf load(String location) {
        ClassPathResource resource = new ClassPathResource(location);
        Assert.isTrue(resource.exists(), String.format("类路径下不存在文件%s", location));
        try {
            String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            return JSON.parseObject(content, CanalGlueConf.class);
        } catch (IOException e) {
            // should not reach
            throw new IllegalStateException(e);
        }
    }
}
复制代码


读取ClassPath下的某个location为绝对路径的文件内容字符串,然后使用Fasfjson转成CanalGlueConf对象。这个是默认的实现,使用canal-glue模块可以覆盖此实现,通过自定义的实现加载配置。


JSON配置模块在后来从业务系统抽离此胶水层的时候已经完全废弃,使用纯注解驱动和核心抽象组件继承的方式实现。


核心模块开发



主要包括几个模块:

  • 基本模型定义。
  • 适配器层开发。
  • 转换器和解析器层开发。
  • 处理器层开发。
  • 全局组件自动配置模块开发(仅限于Spring体系,已经抽取到spring-boot-starter-canal-glue模块)。
  • CanalGlue开发。


基本模型定义


定义顶层的KEY,也就是对于某个数据库的某一个确定的表,需要一个唯一标识:


// 模型表对象
public interface ModelTable {
    String database();
    String table();
    static ModelTable of(String database, String table) {
        return DefaultModelTable.of(database, table);
    }
}
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
public class DefaultModelTable implements ModelTable {
    private final String database;
    private final String table;
    @Override
    public String database() {
        return database;
    }
    @Override
    public String table() {
        return table;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        DefaultModelTable that = (DefaultModelTable) o;
        return Objects.equals(database, that.database) &&
                Objects.equals(table, that.table);
    }
    @Override
    public int hashCode() {
        return Objects.hash(database, table);
    }
}
复制代码


这里实现类DefaultModelTable重写了equals()hashCode()方法便于把ModelTable实例应用为HashMap容器的KEY,这样后面就可以设计ModelTable -> Processor的缓存结构。


由于Canal投放到Kafka的事件内容是一个原始字符串,所以要定义一个和前文提到的FlatMessage基本一致的事件类CanalBinLogEvent


@Data
public class CanalBinLogEvent {
    /**
     * 事件ID,没有实际意义
     */
    private Long id;
    /**
     * 当前更变后节点数据
     */
    private List<Map<String, String>> data;
    /**
     * 主键列名称列表
     */
    private List<String> pkNames;
    /**
     * 当前更变前节点数据
     */
    private List<Map<String, String>> old;
    /**
     * 类型 UPDATE\INSERT\DELETE\QUERY
     */
    private String type;
    /**
     * binlog execute time
     */
    private Long es;
    /**
     * dml build timestamp
     */
    private Long ts;
    /**
     * 执行的sql,不一定存在
     */
    private String sql;
    /**
     * 数据库名称
     */
    private String database;
    /**
     * 表名称
     */
    private String table;
    /**
     * SQL类型映射
     */
    private Map<String, Integer> sqlType;
    /**
     * MySQL字段类型映射
     */
    private Map<String, String> mysqlType;
    /**
     * 是否DDL
     */
    private Boolean isDdl;
}
复制代码


根据此事件对象,再定义解析完毕后的结果对象CanalBinLogResult


// 常量
@RequiredArgsConstructor
@Getter
public enum BinLogEventType {
    QUERY("QUERY", "查询"),
    INSERT("INSERT", "新增"),
    UPDATE("UPDATE", "更新"),
    DELETE("DELETE", "删除"),
    ALTER("ALTER", "列修改操作"),
    UNKNOWN("UNKNOWN", "未知"),
    ;
    private final String type;
    private final String description;
    public static BinLogEventType fromType(String type) {
        for (BinLogEventType binLogType : BinLogEventType.values()) {
            if (binLogType.getType().equals(type)) {
                return binLogType;
            }
        }
        return BinLogEventType.UNKNOWN;
    }
}
// 常量
@RequiredArgsConstructor
@Getter
public enum OperationType {
    /**
     * DML
     */
    DML("dml", "DML语句"),
    /**
     * DDL
     */
    DDL("ddl", "DDL语句"),
    ;
    private final String type;
    private final String description;
}
@Data
public class CanalBinLogResult<T> {
    /**
     * 提取的长整型主键
     */
    private Long primaryKey;
    /**
     * binlog事件类型
     */
    private BinLogEventType binLogEventType;
    /**
     * 更变前的数据
     */
    private T beforeData;
    /**
     * 更变后的数据
     */
    private T afterData;
    /**
     * 数据库名称
     */
    private String databaseName;
    /**
     * 表名称
     */
    private String tableName;
    /**
     * sql语句 - 一般是DDL的时候有用
     */
    private String sql;
    /**
     * MySQL操作类型
     */
    private OperationType operationType;
}
复制代码


开发适配器层


定义顶层的适配器SPI接口:


public interface SourceAdapter<SOURCE, SINK> {
    SINK adapt(SOURCE source);
}
复制代码


接着开发适配器实现类:


// 原始字符串直接返回
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class RawStringSourceAdapter implements SourceAdapter<String, String> {
    @Override
    public String adapt(String source) {
        return source;
    }
}
// Fastjson转换
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class FastJsonSourceAdapter<T> implements SourceAdapter<String, T> {
    private final Class<T> klass;
    @Override
    public T adapt(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        return JSON.parseObject(source, klass);
    }
}
// Facade
public enum SourceAdapterFacade {
    /**
     * 单例
     */
    X;
    private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of();
    @SuppressWarnings("unchecked")
    public <T> T adapt(Class<T> klass, String source) {
        if (klass.isAssignableFrom(String.class)) {
            return (T) I_S_A.adapt(source);
        }
        return FastJsonSourceAdapter.of(klass).adapt(source);
    }
}
复制代码


最终直接使用SourceAdapterFacade#adapt()方法即可,因为实际上绝大多数情况下只会使用原始字符串和String -> Class实例,适配器层设计可以简单点。


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
SQL Java 数据库
建模底层逻辑问题之ORM框架建模中,执行SQL的过程中被抽象和组织是如何实现的
建模底层逻辑问题之ORM框架建模中,执行SQL的过程中被抽象和组织是如何实现的
|
2月前
|
前端开发 Java 应用服务中间件
Java应用结构规范问题之dal层实现对数据源的操作的问题如何解决
Java应用结构规范问题之dal层实现对数据源的操作的问题如何解决
|
2月前
|
SQL 存储 数据管理
掌握SQL Server Integration Services (SSIS)精髓:从零开始构建自动化数据提取、转换与加载(ETL)流程,实现高效数据迁移与集成——轻松上手SSIS打造企业级数据管理利器
【8月更文挑战第31天】SQL Server Integration Services (SSIS) 是 Microsoft 提供的企业级数据集成平台,用于高效完成数据提取、转换和加载(ETL)任务。本文通过简单示例介绍 SSIS 的基本使用方法,包括创建数据包、配置数据源与目标以及自动化执行流程。首先确保安装了 SQL Server Data Tools (SSDT),然后在 Visual Studio 中创建新的 SSIS 项目,通过添加控制流和数据流组件,实现从 CSV 文件到 SQL Server 数据库的数据迁移。
106 0
|
3月前
|
存储 数据库
软件交付问题之关于数据存储层的编写,如何解决
软件交付问题之关于数据存储层的编写,如何解决
|
3月前
|
测试技术 数据库 容器
开发与运维测试问题之操作数据库进行DAO层测试如何解决
开发与运维测试问题之操作数据库进行DAO层测试如何解决
|
4月前
|
存储 数据库连接 数据库
逆向学习数据库篇:表设计和数据库操作的核心概念与流程
逆向学习数据库篇:表设计和数据库操作的核心概念与流程
31 0
|
5月前
|
供应链 JavaScript API
简化数据迁移:API接口的应用
在现代商业环境中,数据迁移已经成为企业运营中不可或缺的一部分。随着技术的进步和市场的变化,企业需要将商品数据从一个系统迁移到另一个更高效或更具成本效益的系统。这一过程可以借助应用程序编程接口(API)来简化。以下是如何通过使用API接口来简化商品数据迁移过程的详细步骤。
|
SQL Oracle 关系型数据库
「集成架构」2020年最好的15个ETL工具(第二部)
「集成架构」2020年最好的15个ETL工具(第二部)
|
存储 SQL Kubernetes
【数据库自动化测试流程构建】各模块简介
【数据库自动化测试流程构建】各模块简介
180 0