简化ETL工作,编写一个Canal胶水层(下)

本文涉及的产品
云解析DNS-重点域名监控,免费拨测 20万次(价值200元)
简介: 这是一篇憋了很久的文章,一直想写,却又一直忘记了写。整篇文章可能会有点流水账,相对详细地介绍怎么写一个小型的"框架"。这个精悍的胶水层已经在生产环境服役超过半年,这里尝试把耦合业务的代码去掉,提炼出一个相对简洁的版本。

核心模块开发



开发转换器和解析器层


对于Canal解析完成的binlog事件,dataold属性是K-V结构,并且KEY都是String类型,需要遍历解析才能推导出完整的目标实例。


转换后的实例的属性类型目前只支持包装类,int等原始类型不支持


为了更好地通过目标实体和实际的数据库、表和列名称、列类型进行映射,引入了两个自定义注解CanalModel@CanalField,它们的定义如下:


// @CanalModel
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface CanalModel {
    /**
     * 目标数据库
     */
    String database();
    /**
     * 目标表
     */
    String table();
    /**
     * 属性名 -> 列名命名转换策略,可选值有:DEFAULT(原始)、UPPER_UNDERSCORE(驼峰转下划线大写)和LOWER_UNDERSCORE(驼峰转下划线小写)
     */
    FieldNamingPolicy fieldNamingPolicy() default FieldNamingPolicy.DEFAULT;
}
// @CanalField
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface CanalField {
    /**
     * 行名称
     *
     * @return columnName
     */
    String columnName() default "";
    /**
     * sql字段类型
     *
     * @return JDBCType
     */
    JDBCType sqlType() default JDBCType.NULL;
    /**
     * 转换器类型
     *
     * @return klass
     */
    Class<? extends BaseCanalFieldConverter<?>> converterKlass() default NullCanalFieldConverter.class;
}
复制代码


定义顶层转换器接口BinLogFieldConverter


public interface BinLogFieldConverter<SOURCE, TARGET> {
    TARGET convert(SOURCE source);
}
复制代码


目前暂定可以通过目标属性的Class和通过注解指定的SQLType类型进行匹配,所以再定义一个抽象转换器BaseCanalFieldConverter


public abstract class BaseCanalFieldConverter<T> implements BinLogFieldConverter<String, T> {
    private final SQLType sqlType;
    private final Class<?> klass;
    protected BaseCanalFieldConverter(SQLType sqlType, Class<?> klass) {
        this.sqlType = sqlType;
        this.klass = klass;
    }
    @Override
    public T convert(String source) {
        if (StringUtils.isEmpty(source)) {
            return null;
        }
        return convertInternal(source);
    }
    /**
     * 内部转换方法
     *
     * @param source 源字符串
     * @return T
     */
    protected abstract T convertInternal(String source);
    /**
     * 返回SQL类型
     *
     * @return SQLType
     */
    public SQLType sqlType() {
        return sqlType;
    }
    /**
     * 返回类型
     *
     * @return Class<?>
     */
    public Class<?> typeKlass() {
        return klass;
    }
}
复制代码


BaseCanalFieldConverter是面向目标实例中的单个属性的,例如对于实例中的Long类型的属性,可以实现一个BigIntCanalFieldConverter


public class BigIntCanalFieldConverter extends BaseCanalFieldConverter<Long> {
    /**
     * 单例
     */
    public static final BaseCanalFieldConverter<Long> X = new BigIntCanalFieldConverter();
    private BigIntCanalFieldConverter() {
        super(JDBCType.BIGINT, Long.class);
    }
    @Override
    protected Long convertInternal(String source) {
        if (null == source) {
            return null;
        }
        return Long.valueOf(source);
    }
}
复制代码


其他类型以此类推,目前已经开发好的最常用的内建转换器如下:


JDBCType JAVAType 转换器
NULL Void NullCanalFieldConverter
BIGINT Long BigIntCanalFieldConverter
VARCHAR String VarcharCanalFieldConverter
DECIMAL BigDecimal DecimalCanalFieldConverter
INTEGER Integer IntCanalFieldConverter
TINYINT Integer TinyIntCanalFieldConverter
DATE java.time.LocalDate SqlDateCanalFieldConverter0
DATE java.sql.Date SqlDateCanalFieldConverter1
TIMESTAMP java.time.LocalDateTime TimestampCanalFieldConverter0
TIMESTAMP java.util.Date TimestampCanalFieldConverter1
TIMESTAMP java.time.OffsetDateTime TimestampCanalFieldConverter2


所有转换器实现都设计为无状态的单例,方便做动态注册和覆盖。接着定义一个转换器工厂CanalFieldConverterFactory,提供API通过指定参数加载目标转换器实例:


// 入参
@SuppressWarnings("rawtypes")
@Builder
@Data
public class CanalFieldConvertInput {
    private Class<?> fieldKlass;
    private Class<? extends BaseCanalFieldConverter> converterKlass;
    private SQLType sqlType;
    @Tolerate
    public CanalFieldConvertInput() {
    }
}
// 结果
@Builder
@Getter
public class CanalFieldConvertResult {
    private final BaseCanalFieldConverter<?> converter;
}
// 接口
public interface CanalFieldConverterFactory {
    default void registerConverter(BaseCanalFieldConverter<?> converter) {
        registerConverter(converter, true);
    }
    void registerConverter(BaseCanalFieldConverter<?> converter, boolean replace);
    CanalFieldConvertResult load(CanalFieldConvertInput input);
}
复制代码


CanalFieldConverterFactory提供了可以注册自定义转化器的registerConverter()方法,这样就可以让使用者注册自定义的转换器和覆盖默认的转换器。


至此,可以通过指定的参数,加载实例属性的转换器,拿到转换器实例,就可以针对目标实例,从原始事件中解析对应的K-V结构。接着需要编写最核心的解析器模块,此模块主要包含三个方面:


  • 唯一BIGINT类型主键的解析(这一点是公司技术规范的一条铁规则,MySQL每个表只能定义唯一的BIGINT UNSIGNED自增趋势主键)。
  • 更变前的数据,对应于原始事件中的old属性节点(不一定存在,例如INSERT语句中不存在此属性节点)。
  • 更变后的数据,对应于原始事件中的data属性节点。


定义解析器接口CanalBinLogEventParser如下:


public interface CanalBinLogEventParser {
    /**
     * 解析binlog事件
     *
     * @param event               事件
     * @param klass               目标类型
     * @param primaryKeyFunction  主键映射方法
     * @param commonEntryFunction 其他属性映射方法
     * @return CanalBinLogResult
     */
    <T> List<CanalBinLogResult<T>> parse(CanalBinLogEvent event,
                                         Class<T> klass,
                                         BasePrimaryKeyTupleFunction primaryKeyFunction,
                                         BaseCommonEntryFunction<T> commonEntryFunction);
}
复制代码


解析器的解析方法依赖于:

  • binlog事件实例,这个是上游的适配器组件的结果。
  • 转换的目标类型。
  • BasePrimaryKeyTupleFunction主键映射方法实例,默认使用内建的BigIntPrimaryKeyTupleFunction
  • BaseCommonEntryFunction非主键通用列-属性映射方法实例,默认使用内建的ReflectionBinLogEntryFunction这个是非主键列的转换核心,里面使用到了反射)。


解析返回结果是一个List,原因是FlatMessage在批量写入的时候的数据结构本来就是一个List<Map<String,String>>,这里只是"顺水推舟"。


开发处理器层


处理器是开发者处理最终解析出来的实体的入口,只需要面向不同类型的事件选择对应的处理方法即可,看起来如下:


public abstract class BaseCanalBinlogEventProcessor<T> extends BaseParameterizedTypeReferenceSupport<T> {
    protected void processInsertInternal(CanalBinLogResult<T> result) {
    }
    protected void processUpdateInternal(CanalBinLogResult<T> result) {
    }
    protected void processDeleteInternal(CanalBinLogResult<T> result) {
    }
    protected void processDDLInternal(CanalBinLogResult<T> result) {
    }
}
复制代码


例如需要处理Insert事件,则子类继承BaseCanalBinlogEventProcessor,对应的实体类(泛型的替换)使用@CanalModel注解声明,然后覆盖processInsertInternal()方法即可。期间子处理器可以覆盖自定义异常处理器实例,如:


@Override
protected ExceptionHandler exceptionHandler() {
    return EXCEPTION_HANDLER;
}
/**
    * 覆盖默认的ExceptionHandler.NO_OP
    */
private static final ExceptionHandler EXCEPTION_HANDLER = (event, throwable)
        -> log.error("解析binlog事件出现异常,事件内容:{}", JSON.toJSONString(event), throwable);
复制代码


另外,有些场景需要对回调前或者回调后的结果做特化处理,因此引入了解析结果拦截器(链)的实现,对应的类是BaseParseResultInterceptor


public abstract class BaseParseResultInterceptor<T> extends BaseParameterizedTypeReferenceSupport<T> {
    public BaseParseResultInterceptor() {
        super();
    }
    public void onParse(ModelTable modelTable) {
    }
    public void onBeforeInsertProcess(ModelTable modelTable, T beforeData, T afterData) {
    }
    public void onAfterInsertProcess(ModelTable modelTable, T beforeData, T afterData) {
    }
    public void onBeforeUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {
    }
    public void onAfterUpdateProcess(ModelTable modelTable, T beforeData, T afterData) {
    }
    public void onBeforeDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {
    }
    public void onAfterDeleteProcess(ModelTable modelTable, T beforeData, T afterData) {
    }
    public void onBeforeDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {
    }
    public void onAfterDDLProcess(ModelTable modelTable, T beforeData, T afterData, String sql) {
    }
    public void onParseFinish(ModelTable modelTable) {
    }
    public void onParseCompletion(ModelTable modelTable) {
    }
}
复制代码


解析结果拦截器的回调时机可以参看上面的架构图或者BaseCanalBinlogEventProcessor的源代码。


开发全局组件自动配置模块


如果使用了Spring容器,需要添加一个配置类来加载所有既有的组件,添加一个全局配置类CanalGlueAutoConfiguration(这个类可以在项目的spring-boot-starter-canal-glue模块中看到,这个模块就只有一个类):


@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {
    private ConfigurableListableBeanFactory configurableListableBeanFactory;
    @Bean
    @ConditionalOnMissingBean
    public CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {
        return InMemoryCanalBinlogEventProcessorFactory.of();
    }
    @Bean
    @ConditionalOnMissingBean
    public ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {
        return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);
    }
    @Bean
    @ConditionalOnMissingBean
    public CanalFieldConverterFactory canalFieldConverterFactory() {
        return InMemoryCanalFieldConverterFactory.of();
    }
    @Bean
    @ConditionalOnMissingBean
    public CanalBinLogEventParser canalBinLogEventParser() {
        return DefaultCanalBinLogEventParser.of();
    }
    @Bean
    @ConditionalOnMissingBean
    public ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {
        return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);
    }
    @Bean
    @Primary
    public CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        return DefaultCanalGlue.of(canalBinlogEventProcessorFactory);
    }
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }
    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public void afterSingletonsInstantiated() {
        ParseResultInterceptorManager parseResultInterceptorManager
                = configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);
        ModelTableMetadataManager modelTableMetadataManager
                = configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);
        CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory
                = configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);
        CanalBinLogEventParser canalBinLogEventParser
                = configurableListableBeanFactory.getBean(CanalBinLogEventParser.class);
        Map<String, BaseParseResultInterceptor> interceptors
                = configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.class);
        interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor));
        Map<String, BaseCanalBinlogEventProcessor> processors
                = configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.class);
        processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager,
                canalBinlogEventProcessorFactory, parseResultInterceptorManager));
    }
}
复制代码


为了更好地让其他服务引入此配置类,可以使用spring.factories的特性。新建resources/META-INF/spring.factories文件,内容如下:


org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.throwx.canal.gule.config.CanalGlueAutoConfiguration
复制代码


这样子通过引入spring-boot-starter-canal-glue就可以激活所有用到的组件并且初始化所有已经添加到Spring容器中的处理器。


CanalGlue开发



CanalGlue其实就是提供binlog事件字符串的处理入口,目前定义为一个接口:


public interface CanalGlue {
    void process(String content);
}
复制代码


此接口的实现DefaultCanalGlue也十分简单:


@RequiredArgsConstructor(access = AccessLevel.PUBLIC, staticName = "of")
public class DefaultCanalGlue implements CanalGlue {
    private final CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;
    @Override
    public void process(String content) {
        CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.class, content);
        ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable());
        canalBinlogEventProcessorFactory.get(modelTable).forEach(processor -> processor.process(event));
    }
}
复制代码


使用源适配器把字符串转换为CanalBinLogEvent实例,再委托处理器工厂寻找对应的BaseCanalBinlogEventProcessor列表去处理输入的事件实例。


使用canal-glue



主要包括下面几个维度,都在canal-glue-exampletest包下:

  • 一般情况下使用处理器处理INSERT事件。
  • 自定义针对DDL变更的预警父处理器,实现DDL变更预警。
  • 单表对应多个处理器。
  • 使用解析结果处理器针对特定字段进行AES加解密处理。
  • Spring容器下,一般编程式使用。
  • 使用openjdk-jmh进行Benchmark基准性能测试。


这里简单提一下在Spring体系下的使用方式,引入依赖spring-boot-starter-canal-glue


<dependency>
    <groupId>cn.throwx</groupId>
    <artifactId>spring-boot-starter-canal-glue</artifactId>
    <version>版本号</version>
</dependency>
复制代码


编写一个实体或者DTOOrderModel


@Data
@CanalModel(database = "db_order_service", table = "t_order", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public static class OrderModel {
    private Long id;
    private String orderId;
    private OffsetDateTime createTime;
    private BigDecimal amount;
}
复制代码


这里使用了@CanalModel注解绑定了数据库db_order_service和表t_order,属性名-列名映射策略为驼峰转小写下划线。接着定义一个处理器OrderProcessor和自定义异常处理器(可选,这里是为了模拟在处理事件的时候抛出自定义异常):


@Component
public class OrderProcessor extends BaseCanalBinlogEventProcessor<OrderModel> {
    @Override
    protected void processInsertInternal(CanalBinLogResult<OrderModel> result) {
        OrderModel orderModel = result.getAfterData();
        logger.info("接收到订单保存binlog,主键:{},模拟抛出异常...", orderModel.getId());
        throw new RuntimeException(String.format("[id:%d]", orderModel.getId()));
    }
    @Override
    protected ExceptionHandler exceptionHandler() {
        return EXCEPTION_HANDLER;
    }
    /**
        * 覆盖默认的ExceptionHandler.NO_OP
        */
    private static final ExceptionHandler EXCEPTION_HANDLER = (event, throwable)
            -> log.error("解析binlog事件出现异常,事件内容:{}", JSON.toJSONString(event), throwable);
}
复制代码


假设一个写入订单数据的binlog事件如下:


{
  "data": [
    {
      "id": "1",
      "order_id": "10086",
      "amount": "999.0",
      "create_time": "2020-03-02 05:12:49"
    }
  ],
  "database": "db_order_service",
  "es": 1583143969000,
  "id": 3,
  "isDdl": false,
  "mysqlType": {
    "id": "BIGINT",
    "order_id": "VARCHAR(64)",
    "amount": "DECIMAL(10,2)",
    "create_time": "DATETIME"
  },
  "old": null,
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": -5,
    "order_id": 12,
    "amount": 3,
    "create_time": 93
  },
  "table": "t_order",
  "ts": 1583143969460,
  "type": "INSERT"
}
复制代码


执行结果如下:


网络异常,图片无法展示
|


如果直接对接Canal投放到KafkaTopic也很简单,配合Kafka的消费者使用的示例如下:


@Slf4j
@Component
@RequiredArgsConstructor
public class CanalEventListeners {
    private final CanalGlue canalGlue;
    @KafkaListener(
            id = "${canal.event.order.listener.id:db-order-service-listener}",
            topics = "db_order_service", 
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void onCrmMessage(String content) {
        canalGlue.process(content);
    }    
}
复制代码


小结



笔者开发这个canal-glue的初衷是需要做一个极大提升效率的大型字符串转换器,因为刚刚接触到"小数据"领域,而且人手不足,而且需要处理下游大量的报表,因为不可能花大量人力在处理这些不停重复的模板化代码上。虽然整体设计还不是十分优雅,至少在提升开发效率这个点上canal-glue做到了。


项目仓库:

仓库最新代码暂时放在develop分支


(本文完 c-15-d e-a-20201005 鸽了快一个月)


相关文章
|
11月前
|
SQL Java 关系型数据库
【📕分布式锁通关指南 01】从解决库存超卖开始加锁的初体验
本文通过电商场景中的库存超卖问题,深入探讨了JVM锁、MySQL悲观锁和乐观锁的实现及其局限性。首先介绍了单次访问下库存扣减逻辑的正常运行,但在高并发场景下出现了超卖问题。接着分析了JVM锁在多例模式、事务模式和集群模式下的失效情况,并提出了使用数据库锁机制(如悲观锁和乐观锁)来解决并发问题。 悲观锁通过`update`语句或`select for update`实现,能有效防止超卖,但存在锁范围过大、性能差等问题。乐观锁则通过版本号或时间戳实现,适合读多写少的场景,但也面临高并发写操作性能低和ABA问题。 最终,文章强调没有完美的方案,只有根据具体业务场景选择合适的锁机制。
382 12
【📕分布式锁通关指南 01】从解决库存超卖开始加锁的初体验
|
3月前
|
人工智能 数据可视化 数据处理
从拖拽到架构:低代码如何兼顾速度、灵活性与可控边界
在敏捷开发背景下,低代码技术已从工具层面演进为架构级交付要素,重构IT治理与系统边界管理。它推动开发职能在专业开发者、业务人员与运维管理者之间重新分配,带来灵活性的同时也引发架构平衡、系统可维护性与治理统一等挑战。唯有在清晰架构与治理规则下,低代码才能真正成为敏捷开发的助推器,而非技术债务来源。本文深入探讨低代码在可视化工作流、模型驱动开发、数据处理、AI融合、插件生态、开放架构与企业功能等方面的技术实现与业务价值。
从拖拽到架构:低代码如何兼顾速度、灵活性与可控边界
|
运维 供应链 前端开发
开发一个 ERP
【9月更文第5天】开发一个 ERP (Enterprise Resource Planning) 系统是一项复杂的工程,涉及到多个业务流程的集成与优化。ERP 系统旨在帮助企业整合财务、人力资源、采购、销售、库存管理和生产计划等多个部门的数据,从而提高运营效率和决策质量。本文将带你一起体验从零开始开发一个简单的 ERP 系统,并通过示例代码来说明关键组件的设计与实现。
823 3
|
关系型数据库 MySQL Linux
Navicat 连接 Windows、Linux系统下的MySQL 各种错误,修改密码。
使用Navicat连接Windows和Linux系统下的MySQL时可能遇到的四种错误及其解决方法,包括错误代码2003、1045和2013,以及如何修改MySQL密码。
1229 0
|
Java
java中File转为MultipartFile的问题解决
java中File转为MultipartFile的问题解决
750 2
|
Java 应用服务中间件 数据库连接
【一文搞懂Servlet】
【一文搞懂Servlet】
基于PI控制的PMSM永磁同步电机控制系统simulink建模与仿真
该文探讨了基于PI控制的PMSM永磁同步电机Simulink建模与仿真,采用矢量控制策略,不依赖Simulink内置模型。在MATLAB2022a环境下,建立了电机数学模型,简化了复杂的电磁关系。PI控制器用于实现电流解耦控制,提高动态响应。控制系统通过PI调节直轴和交轴电流,经坐标变换和PWM调制驱动电机运行,实现高性能闭环控制。
|
负载均衡 监控 API
实时计算 Flink版产品使用合集之Akka RPC 压力过大,除了增大心跳超时,是否有其他解决方法
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之使用cdc-Oracle连接器报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
Flink CDC产品常见问题之使用cdc-Oracle连接器报错如何解决