简化ETL工作,编写一个Canal胶水层(上)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。

前提



这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。


之前写的几篇文章里面其中一篇曾经提到过Canal解析MySQLbinlog事件后的对象如下(来源于Canal源码com.alibaba.otter.canal.protocol.FlatMessage):


网络异常,图片无法展示
|


如果直接对此原始对象进行解析,那么会出现很多解析模板代码,一旦有改动就会牵一发动全身,这是我们不希望发生的一件事。于是花了一点点时间写了一个Canal胶水层,让接收到的FlatMessage根据表名称直接转换为对应的DTO实例,这样能在一定程度上提升开发效率并且减少模板化代码,这个胶水层的数据流示意图如下:


网络异常,图片无法展示
|


要编写这样的胶水层主要用到:

  • 反射。
  • 注解。
  • 策略模式。
  • IOC容器(可选)。


项目的模块如下:

  • canal-glue-core:核心功能。
  • spring-boot-starter-canal-glue:适配SpringIOC容器,添加自动配置。
  • canal-glue-example:使用例子和基准测试。


下文会详细分析此胶水层如何实现。


引入依赖



为了不污染引用此模块的外部服务依赖,除了JSON转换的依赖之外,其他依赖的scope定义为provide或者test类型,依赖版本和BOM如下:


<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spring.boot.version>2.3.0.RELEASE</spring.boot.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <lombok.version>1.18.12</lombok.version>
        <fastjson.version>1.2.73</fastjson.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <scope>import</scope>
            <type>pom</type>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
</dependencies>
复制代码


其中,canal-glue-core模块本质上只依赖于fastjson,可以完全脱离spring体系使用。


基本架构



这里提供一个"后知后觉"的架构图,因为之前为了快速怼到线上,初版没有考虑这么多,甚至还耦合了业务代码,组件是后来抽离出来的:


网络异常,图片无法展示
|


设计配置模块(已经移除)



设计配置模块在设计的时候考虑使用了外置配置文件和纯注解两种方式,前期使用了JSON外置配置文件的方式,纯注解是后来增加的,二选一。这一节简单介绍一下JSON外置配置文件的配置加载,纯注解留到后面处理器模块时候分析。


当初是想快速进行胶水层的开发,所以配置文件使用了可读性比较高的JSON格式:


{
  "version": 1,
  "module": "canal-glue",
  "databases": [
    {
      "database": "db_payment_service",
      "processors": [
        {
          "table": "payment_order",
          "processor": "x.y.z.PaymentOrderProcessor",
          "exceptionHandler": "x.y.z.PaymentOrderExceptionHandler"
        }
      ]
    },
    {
      ......
    }
  ]
}
复制代码


JSON配置在设计的时候尽可能不要使用JSON Array作为顶层配置,因为这样做设计的对象会比较怪


因为使用该模块的应用有可能需要处理Canal解析多个上游数据库的binlog事件,所以配置模块设计的时候需要以databaseKEY,挂载多个table以及对应的表binlog事件处理器以及异常处理器。然后对着JSON文件的格式撸一遍对应的实体类出来:


@Data
public class CanalGlueProcessorConf {
    private String table;
    private String processor;
    private String exceptionHandler;
}
@Data
public class CanalGlueDatabaseConf {
    private String database;
    private List<CanalGlueProcessorConf> processors;
}
@Data
public class CanalGlueConf {
    private Long version;
    private String module;
    private List<CanalGlueDatabaseConf> database;
}
复制代码


实体编写完,接着可以编写一个配置加载器,简单起见,配置文件直接放ClassPath之下,加载器如下:


public interface CanalGlueConfLoader {
    CanalGlueConf load(String location);
}
// 实现
public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader {
    @Override
    public CanalGlueConf load(String location) {
        ClassPathResource resource = new ClassPathResource(location);
        Assert.isTrue(resource.exists(), String.format("类路径下不存在文件%s", location));
        try {
            String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
            return JSON.parseObject(content, CanalGlueConf.class);
        } catch (IOException e) {
            // should not reach
            throw new IllegalStateException(e);
        }
    }
}
复制代码


读取ClassPath下的某个location为绝对路径的文件内容字符串,然后使用Fasfjson转成CanalGlueConf对象。这个是默认的实现,使用canal-glue模块可以覆盖此实现,通过自定义的实现加载配置。


JSON配置模块在后来从业务系统抽离此胶水层的时候已经完全废弃,使用纯注解驱动和核心抽象组件继承的方式实现。


核心模块开发



主要包括几个模块:

  • 基本模型定义。
  • 适配器层开发。
  • 转换器和解析器层开发。
  • 处理器层开发。
  • 全局组件自动配置模块开发(仅限于Spring体系,已经抽取到spring-boot-starter-canal-glue模块)。
  • CanalGlue开发。


基本模型定义


定义顶层的KEY,也就是对于某个数据库的某一个确定的表,需要一个唯一标识:


// 模型表对象
public interface ModelTable {
    String database();
    String table();
    static ModelTable of(String database, String table) {
        return DefaultModelTable.of(database, table);
    }
}
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
public class DefaultModelTable implements ModelTable {
    private final String database;
    private final String table;
    @Override
    public String database() {
        return database;
    }
    @Override
    public String table() {
        return table;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        DefaultModelTable that = (DefaultModelTable) o;
        return Objects.equals(database, that.database) &&
                Objects.equals(table, that.table);
    }
    @Override
    public int hashCode() {
        return Objects.hash(database, table);
    }
}
复制代码


这里实现类DefaultModelTable重写了equals()hashCode()方法便于把ModelTable实例应用为HashMap容器的KEY,这样后面就可以设计ModelTable -> Processor的缓存结构。


由于Canal投放到Kafka的事件内容是一个原始字符串,所以要定义一个和前文提到的FlatMessage基本一致的事件类CanalBinLogEvent


@Data
public class CanalBinLogEvent {
    /**
     * 事件ID,没有实际意义
     */
    private Long id;
    /**
     * 当前更变后节点数据
     */
    private List<Map<String, String>> data;
    /**
     * 主键列名称列表
     */
    private List<String> pkNames;
    /**
     * 当前更变前节点数据
     */
    private List<Map<String, String>> old;
    /**
     * 类型 UPDATE\INSERT\DELETE\QUERY
     */
    private String type;
    /**
     * binlog execute time
     */
    private Long es;
    /**
     * dml build timestamp
     */
    private Long ts;
    /**
     * 执行的sql,不一定存在
     */
    private String sql;
    /**
     * 数据库名称
     */
    private String database;
    /**
     * 表名称
     */
    private String table;
    /**
     * SQL类型映射
     */
    private Map<String, Integer> sqlType;
    /**
     * MySQL字段类型映射
     */
    private Map<String, String> mysqlType;
    /**
     * 是否DDL
     */
    private Boolean isDdl;
}
复制代码


根据此事件对象,再定义解析完毕后的结果对象CanalBinLogResult


// 常量
@RequiredArgsConstructor
@Getter
public enum BinLogEventType {
    QUERY("QUERY", "查询"),
    INSERT("INSERT", "新增"),
    UPDATE("UPDATE", "更新"),
    DELETE("DELETE", "删除"),
    ALTER("ALTER", "列修改操作"),
    UNKNOWN("UNKNOWN", "未知"),
    ;
    private final String type;
    private final String description;
    public static BinLogEventType fromType(String type) {
        for (BinLogEventType binLogType : BinLogEventType.values()) {
            if (binLogType.getType().equals(type)) {
                return binLogType;
            }
        }
        return BinLogEventType.UNKNOWN;
    }
}
// 常量
@RequiredArgsConstructor
@Getter
public enum OperationType {
    /**
     * DML
     */
    DML("dml", "DML语句"),
    /**
     * DDL
     */
    DDL("ddl", "DDL语句"),
    ;
    private final String type;
    private final String description;
}
@Data
public class CanalBinLogResult<T> {
    /**
     * 提取的长整型主键
     */
    private Long primaryKey;
    /**
     * binlog事件类型
     */
    private BinLogEventType binLogEventType;
    /**
     * 更变前的数据
     */
    private T beforeData;
    /**
     * 更变后的数据
     */
    private T afterData;
    /**
     * 数据库名称
     */
    private String databaseName;
    /**
     * 表名称
     */
    private String tableName;
    /**
     * sql语句 - 一般是DDL的时候有用
     */
    private String sql;
    /**
     * MySQL操作类型
     */
    private OperationType operationType;
}
复制代码


开发适配器层


定义顶层的适配器SPI接口:


public interface SourceAdapter<SOURCE, SINK> {
    SINK adapt(SOURCE source);
}
复制代码


接着开发适配器实现类:


// 原始字符串直接返回
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class RawStringSourceAdapter implements SourceAdapter<String, String> {
    @Override
    public String adapt(String source) {
        return source;
    }
}
// Fastjson转换
@RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of")
class FastJsonSourceAdapter<T> implements SourceAdapter<String, T> {
    private final Class<T> klass;
    @Override
    public T adapt(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        return JSON.parseObject(source, klass);
    }
}
// Facade
public enum SourceAdapterFacade {
    /**
     * 单例
     */
    X;
    private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of();
    @SuppressWarnings("unchecked")
    public <T> T adapt(Class<T> klass, String source) {
        if (klass.isAssignableFrom(String.class)) {
            return (T) I_S_A.adapt(source);
        }
        return FastJsonSourceAdapter.of(klass).adapt(source);
    }
}
复制代码


最终直接使用SourceAdapterFacade#adapt()方法即可,因为实际上绝大多数情况下只会使用原始字符串和String -> Class实例,适配器层设计可以简单点。


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
SQL 存储 关系型数据库
轻松入门MySQL:简明教程解析数据存储与管理(1)
轻松入门MySQL:简明教程解析数据存储与管理(1)
213 0
|
小程序 前端开发
阻止小程序事件冒泡的三种方法
阻止小程序事件冒泡的三种方法
1458 0
|
9月前
|
机器学习/深度学习 边缘计算 运维
机器学习在网络安全中的防护:智能化的安全屏障
机器学习在网络安全中的防护:智能化的安全屏障
444 15
|
10月前
|
人工智能 API Windows
免费部署本地AI大语言模型聊天系统:Chatbox AI + 马斯克grok2.0大模型(简单5步实现,免费且比GPT4.0更好用)
本文介绍了如何部署本地AI大语言模型聊天系统,使用Chatbox AI客户端应用和Grok-beta大模型。通过获取API密钥、下载并安装Chatbox AI、配置模型,最终实现高效、智能的聊天体验。Grok 2大模型由马斯克X-AI发布,支持超长文本上下文理解,免费且易于使用。
3488 0
|
SQL 关系型数据库 MySQL
MySQL数据库——图形化界面工具(DataGrip),SQL(2)-DML(插入、修改和删除数据)
MySQL数据库——图形化界面工具(DataGrip),SQL(2)-DML(插入、修改和删除数据)
1285 1
|
Docker 容器
一篇文章搞懂docker日志的查看
`docker logs` 命令用于查看Docker容器的日志,支持多个选项:`-f` 跟踪实时日志,`--since` 显示指定时间后的日志,`--tail` 显示指定行数(默认全部),`-t` 显示时间戳。例如,`docker logs -f --tail=200 &lt;容器ID/名称&gt;` 显示最后200行实时日志。还可以结合`grep`进行过滤,或使用`--since`和`--until`指定时间范围。日志可重定向至文件,如`docker logs &lt;容器id&gt; &gt; container_logs.txt`。
3532 6
|
前端开发 JavaScript 开发工具
vscode教程(含使用技巧、保存时自动格式化文件等设置)
vscode教程(含使用技巧、保存时自动格式化文件等设置)
1170 0
|
Kubernetes 网络协议 关系型数据库
Seata常见问题之找不到健康检查接口如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
|
canal SQL 关系型数据库
10.【canal】canal从入门到放弃-mysql+canal+rocketmq实现数据库同步-canal简单使用
【canal】canal从入门到放弃-mysql+canal+rocketmq实现数据库同步-canal简单使用
10.【canal】canal从入门到放弃-mysql+canal+rocketmq实现数据库同步-canal简单使用