前提
这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。
之前写的几篇文章里面其中一篇曾经提到过Canal
解析MySQL
的binlog
事件后的对象如下(来源于Canal
源码com.alibaba.otter.canal.protocol.FlatMessage
):
如果直接对此原始对象进行解析,那么会出现很多解析模板代码,一旦有改动就会牵一发动全身,这是我们不希望发生的一件事。于是花了一点点时间写了一个Canal
胶水层,让接收到的FlatMessage
根据表名称直接转换为对应的DTO
实例,这样能在一定程度上提升开发效率并且减少模板化代码,这个胶水层的数据流示意图如下:
要编写这样的胶水层主要用到:
- 反射。
- 注解。
- 策略模式。
IOC
容器(可选)。
项目的模块如下:
canal-glue-core
:核心功能。spring-boot-starter-canal-glue
:适配Spring
的IOC
容器,添加自动配置。canal-glue-example
:使用例子和基准测试。
下文会详细分析此胶水层如何实现。
引入依赖
为了不污染引用此模块的外部服务依赖,除了JSON
转换的依赖之外,其他依赖的scope
定义为provide
或者test
类型,依赖版本和BOM
如下:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spring.boot.version>2.3.0.RELEASE</spring.boot.version> <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version> <lombok.version>1.18.12</lombok.version> <fastjson.version>1.2.73</fastjson.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <scope>import</scope> <type>pom</type> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> </dependencies> 复制代码
其中,canal-glue-core
模块本质上只依赖于fastjson
,可以完全脱离spring
体系使用。
基本架构
这里提供一个"后知后觉"的架构图,因为之前为了快速怼到线上,初版没有考虑这么多,甚至还耦合了业务代码,组件是后来抽离出来的:
设计配置模块(已经移除)
设计配置模块在设计的时候考虑使用了外置配置文件和纯注解两种方式,前期使用了JSON外置配置文件的方式,纯注解是后来增加的,二选一。这一节简单介绍一下JSON外置配置文件的配置加载,纯注解留到后面处理器模块时候分析。
当初是想快速进行胶水层的开发,所以配置文件使用了可读性比较高的JSON
格式:
{ "version": 1, "module": "canal-glue", "databases": [ { "database": "db_payment_service", "processors": [ { "table": "payment_order", "processor": "x.y.z.PaymentOrderProcessor", "exceptionHandler": "x.y.z.PaymentOrderExceptionHandler" } ] }, { ...... } ] } 复制代码
JSON配置在设计的时候尽可能不要使用JSON Array作为顶层配置,因为这样做设计的对象会比较怪
因为使用该模块的应用有可能需要处理Canal
解析多个上游数据库的binlog
事件,所以配置模块设计的时候需要以database
为KEY
,挂载多个table
以及对应的表binlog
事件处理器以及异常处理器。然后对着JSON
文件的格式撸一遍对应的实体类出来:
@Data public class CanalGlueProcessorConf { private String table; private String processor; private String exceptionHandler; } @Data public class CanalGlueDatabaseConf { private String database; private List<CanalGlueProcessorConf> processors; } @Data public class CanalGlueConf { private Long version; private String module; private List<CanalGlueDatabaseConf> database; } 复制代码
实体编写完,接着可以编写一个配置加载器,简单起见,配置文件直接放ClassPath
之下,加载器如下:
public interface CanalGlueConfLoader { CanalGlueConf load(String location); } // 实现 public class ClassPathCanalGlueConfLoader implements CanalGlueConfLoader { @Override public CanalGlueConf load(String location) { ClassPathResource resource = new ClassPathResource(location); Assert.isTrue(resource.exists(), String.format("类路径下不存在文件%s", location)); try { String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); return JSON.parseObject(content, CanalGlueConf.class); } catch (IOException e) { // should not reach throw new IllegalStateException(e); } } } 复制代码
读取ClassPath
下的某个location
为绝对路径的文件内容字符串,然后使用Fasfjson
转成CanalGlueConf
对象。这个是默认的实现,使用canal-glue
模块可以覆盖此实现,通过自定义的实现加载配置。
JSON配置模块在后来从业务系统抽离此胶水层的时候已经完全废弃,使用纯注解驱动和核心抽象组件继承的方式实现。
核心模块开发
主要包括几个模块:
- 基本模型定义。
- 适配器层开发。
- 转换器和解析器层开发。
- 处理器层开发。
- 全局组件自动配置模块开发(仅限于
Spring
体系,已经抽取到spring-boot-starter-canal-glue
模块)。 CanalGlue
开发。
基本模型定义
定义顶层的KEY
,也就是对于某个数据库的某一个确定的表,需要一个唯一标识:
// 模型表对象 public interface ModelTable { String database(); String table(); static ModelTable of(String database, String table) { return DefaultModelTable.of(database, table); } } @RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of") public class DefaultModelTable implements ModelTable { private final String database; private final String table; @Override public String database() { return database; } @Override public String table() { return table; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } DefaultModelTable that = (DefaultModelTable) o; return Objects.equals(database, that.database) && Objects.equals(table, that.table); } @Override public int hashCode() { return Objects.hash(database, table); } } 复制代码
这里实现类DefaultModelTable
重写了equals()
和hashCode()
方法便于把ModelTable
实例应用为HashMap
容器的KEY
,这样后面就可以设计ModelTable -> Processor
的缓存结构。
由于Canal
投放到Kafka
的事件内容是一个原始字符串,所以要定义一个和前文提到的FlatMessage
基本一致的事件类CanalBinLogEvent
:
@Data public class CanalBinLogEvent { /** * 事件ID,没有实际意义 */ private Long id; /** * 当前更变后节点数据 */ private List<Map<String, String>> data; /** * 主键列名称列表 */ private List<String> pkNames; /** * 当前更变前节点数据 */ private List<Map<String, String>> old; /** * 类型 UPDATE\INSERT\DELETE\QUERY */ private String type; /** * binlog execute time */ private Long es; /** * dml build timestamp */ private Long ts; /** * 执行的sql,不一定存在 */ private String sql; /** * 数据库名称 */ private String database; /** * 表名称 */ private String table; /** * SQL类型映射 */ private Map<String, Integer> sqlType; /** * MySQL字段类型映射 */ private Map<String, String> mysqlType; /** * 是否DDL */ private Boolean isDdl; } 复制代码
根据此事件对象,再定义解析完毕后的结果对象CanalBinLogResult
:
// 常量 @RequiredArgsConstructor @Getter public enum BinLogEventType { QUERY("QUERY", "查询"), INSERT("INSERT", "新增"), UPDATE("UPDATE", "更新"), DELETE("DELETE", "删除"), ALTER("ALTER", "列修改操作"), UNKNOWN("UNKNOWN", "未知"), ; private final String type; private final String description; public static BinLogEventType fromType(String type) { for (BinLogEventType binLogType : BinLogEventType.values()) { if (binLogType.getType().equals(type)) { return binLogType; } } return BinLogEventType.UNKNOWN; } } // 常量 @RequiredArgsConstructor @Getter public enum OperationType { /** * DML */ DML("dml", "DML语句"), /** * DDL */ DDL("ddl", "DDL语句"), ; private final String type; private final String description; } @Data public class CanalBinLogResult<T> { /** * 提取的长整型主键 */ private Long primaryKey; /** * binlog事件类型 */ private BinLogEventType binLogEventType; /** * 更变前的数据 */ private T beforeData; /** * 更变后的数据 */ private T afterData; /** * 数据库名称 */ private String databaseName; /** * 表名称 */ private String tableName; /** * sql语句 - 一般是DDL的时候有用 */ private String sql; /** * MySQL操作类型 */ private OperationType operationType; } 复制代码
开发适配器层
定义顶层的适配器SPI
接口:
public interface SourceAdapter<SOURCE, SINK> { SINK adapt(SOURCE source); } 复制代码
接着开发适配器实现类:
// 原始字符串直接返回 @RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of") class RawStringSourceAdapter implements SourceAdapter<String, String> { @Override public String adapt(String source) { return source; } } // Fastjson转换 @RequiredArgsConstructor(access = AccessLevel.PACKAGE, staticName = "of") class FastJsonSourceAdapter<T> implements SourceAdapter<String, T> { private final Class<T> klass; @Override public T adapt(String source) { if (StringUtils.isEmpty(source)) { return null; } return JSON.parseObject(source, klass); } } // Facade public enum SourceAdapterFacade { /** * 单例 */ X; private static final SourceAdapter<String, String> I_S_A = RawStringSourceAdapter.of(); @SuppressWarnings("unchecked") public <T> T adapt(Class<T> klass, String source) { if (klass.isAssignableFrom(String.class)) { return (T) I_S_A.adapt(source); } return FastJsonSourceAdapter.of(klass).adapt(source); } } 复制代码
最终直接使用SourceAdapterFacade#adapt()
方法即可,因为实际上绝大多数情况下只会使用原始字符串和String -> Class实例
,适配器层设计可以简单点。