1. 介绍
随着系统变得越来越复杂,我们需要更多的解决方案来集中维护大量数据,以便对其进行监控和查询,而又不会干扰运营数据库。在Yotpo,我们有许多微服务和数据库,因此将数据传输到集中式数据湖中的需求至关重要。我们一直在寻找易于使用的基础架构(仅需配置),以节省工程师的时间。
变更数据捕获(Changed Data Capture,简称为CDC)架构是指跟踪变更的数据,以便可以处理这些数据(Wiki[1])。
面临的挑战是跟踪数据库变更并且需要根据不同目的提供不同的物化视图,这对于分析(例如Apache Spark作业)、监控数据变化、搜索索引、衡量数据质量、基于基于事件的操作都可能很有用。
2. 使用CDC跟踪数据库变更
在本文中,我将逐步介绍如何在Yotpo[2]生态系统中实施Change Data Capture架构。
在开始使用CDC之前,我们维护了将数据库表全量加载到数据湖中的工作流,该工作流包括扫描全表并用Parquet文件覆盖S3目录。但该方法不可扩展,会导致数据库过载,而且很费时间。我们希望能够查询最新的数据集,并将数据放入数据湖中(例如Amazon s3[3]和Hive metastore[4]中的数据),以确保数据最终位置的正确性。采用这种架构后,我们在数据湖中获得了最新、被完全监控的生产数据库副本。
基本思路是只要数据库中发生变更(创建/更新/删除),就会提取数据库日志并将其发送至Apache Kafka[5]。物化视图作业也会消费这些事件以便使得视图保持最新状态。物化视图流作业需要消费变更才能始终在S3和Hive中拥有数据库的最新视图。当然内部工程师也可以独立消费这些更改。总的来讲,就是首先将数据库变更先导入Kafka,然后多个系统均可消费Kafka中的数据。
3. CDC-Kafka-Metorikku架构
3.1 Debezium(Kafka Connect)
第一部分是使用数据库插件(基于Kafka Connect[6]),对应架构中的Debezium,特别是它的MySQL连接器。你需要确保在“行”模式下启用了BINLOG才行(此方式是监控数据库变化的重要手段)。然后,Debezium使用JDBC连接到数据库并执行整个内容的快照。之后,每个数据的变更都会实时触发一个事件。这些事件使用Avro编码,并直接发送到Kafka。
3.2 Avro
Avro具有可以演变的模式(schema)。在数据库中添加一列可演变模式,但仍向后兼容。我们更喜欢对数据传输对象使用Avro编码,因为它非常紧凑,并且具有多种数据类型,例如JSON不支持多种数字类型和字节。
3.3 Schema Registry
这里最酷的部分之一是在此过程中模式如何变化。在注册新的数据库插件时,数据库的模式已在Schema Registry[7]中注册,它从数据库派生而来并自动将模式转换为Avro。
每当模式发生变更时,都会在Schema Registry特定表添加对应的新版本模式,这方便我们以后浏览不同的模式版本。
3.4 Apache Hudi存储格式
下一部分是处理物化视图。使用数据湖最大的挑战之一是更新现有数据集中的数据。在经典的基于文件的数据湖体系结构中,当我们要更新一行时,必须读取整个最新数据集并将其重写。Apache Hudi[8]格式是一种开源存储格式,其将ACID事务引入Apache Spark。我们选择Hudi而不是Parquet之类的其他格式,因为它允许对键表达式进行增量更新,在本例中,键表达式是表的主键。为了使Hudi正常工作,我们需要定义三个重要部分
- 键列,用于区分输入中每一行的键。
- 时间列,基于此列,Hudi将使用较新的值来更新行。
- 分区,如何对行进行分区。
3.5 Metorikku
为结合以上所有组件,我们使用了开源的Metorikku[9]库。Metorikku在Apache Spark之上简化了ETL的编写和执行,并支持多种输出格式。
Metorikku消费Kafka的Avro事件,使用Schema Registry反序列化它们,并将它们写为Hudi格式。
我们可以将Metorikku物化视图作业配置为与Hive Metastore同步,这将使我们的作业可以立即访问它。这只需使用Hudi提供开箱即用的功能和进行简单的Hive URL配置。
steps: - dataFrameName: cdc_filtered sql: SELECT ts_ms, op, before, after FROM cdc_events WHERE op IN ('d', 'u', 'c') - dataFrameName: cdc_by_event sql: SELECT ts_ms, CASE WHEN op = 'd' THEN before ELSE after END AS cdc_fields, CASE WHEN op = 'd' THEN true ELSE false END AS _hoodie_delete FROM cdc_filtered - dataFrameName: cdc_with_fields sql: SELECT ts_ms, cdc_fields.*, _hoodie_delete FROM cdc_by_event - dataFrameName: cdc_table sql: SELECT *, id AS hoodie_key, from_unixtime(created_at, 'yyyy/MM/dd') AS hoodie_partition FROM cdc_with_fields output: - dataFrameName: cdc_table outputType: Hudi outputOptions: path: table_view.parquet keyColumn: hoodie_key timeColumn: ts_ms partitionBy: hoodie_partition hivePartitions: year,month,day tableName: hoodie_test saveMode: Append
上面是物化视图的配置,它读取事件[10]并创建物化视图。你可以在我们的端到端CDC测试[11]中找到完整的docker化示例,将其运行在docker环境时你可以参考Docker compose文件(Yotpo使用Hashicorp在AWS上提供的Nomad[12])。
可查看Metorikku完整任务[13]和配置[14]文件。
3.6 监控
Kafka Connect带有开箱即用的监控功能[15],它使我们能够深入了解每个数据库连接器中发生的事情。
使用Metorikku,我们还可以监视实际数据,例如,为每个CDC表统计每种类型(创建/更新/删除)的事件数。一个Metorikku作业可以利用Kafka主题模式[16]来消费多个CDC主题。
4. 展望
对于我们上面讨论的挑战,有很多解决方案。我们集成了一些最佳解决方案以部署CDC基础架构。这使我们能够更好地管理和监控我们的数据湖,而我们也可从这里开始改进。展望未来,基础架构的功能将被扩展并支持更多数据库(如Mongo,Cassandra,PostgreSQL等)。所有工具已经存在,面临的挑战是如何将它们很好地集成在一起。当我们越依赖基础架构,那么服务、监视和数据质量检查之间协同获得的可访问性就越好。