概述
企业正在经历其数据资产的爆炸式增长,这些数据包括批式或流式传输的结构化、半结构化以及非结构化数据,随着海量数据批量导入的场景的增多,企业对于 Data Pipeline 的需求也愈加复杂。新一代云原生实时数仓 SelectDB Cloud 作为一款运行于多云之上的云原生实时数据仓库,致力于通过开箱即用的能力为客户带来简单快速的数仓体验。在生态方面,SelectDB Cloud 提供了丰富的数据连接器插件(Connector)来连接各种来自周边大数据工具的数据源,内置 Kafka、Flink、Spark、DataX 等常见的 Connector。基于此,企业开发者能够更加便捷的将数据移动到 SelectDB Cloud 上,并利用 SelectDB Cloud 从数据资产中获取更高的价值。
SelectDB Cloud 基于 Apache Doris 研发的新一代云原生实时数仓 SelectDB,运行于多家云上,为客户提供极简运维和极致性价比的数仓服务。
Kafka Connect For SelectDB Cloud
Kafka Connect 是一款可扩展、可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以定义 Connectors 来将大量数据迁入迁出Kafka。
SelectDB提供了 Sink Connector 插件,可以将Kafka Topic中的JSON数据保存到SelectDB数据库中。
架构
在业务场景中,通常会通过Debezium Connector将数据库的变更数据实时写入Kafka,或者调用API往Kafka中推送JSON格式数据,使用SelectDB Connector即可将这些数据同步到SelectDB数据库中。
工作原理
Kafka Connector 通过以下过程订阅 Kafka topic 的数据,并将数据 sink 到 SelectDB 中。
SelectDB Connector 通过内部的 task 一对一或一对多的消费对应 topic partition 的数据。当达到阈值(时间或内存或消息数量)时,connector 会将该批次的 records 生成一个临时文件,并上传至 SelectDB 的对象储存中。
当临时文件数达到 50 个或 connector 向 Kafka 集群预提交已成功消费的 offset 时(默认 10s ),将对象存储中临时文件的通过 Copy-Into 的操作,导入至对应的 table 中。
Exactly-Once
Exactly-Once 语义是指即使在机器或应用出现故障的情况下,也不会重复处理数据或者丢失数据。
Selectdb-kafka-connector 通过 Kafka 集群与 SelectDB 实现 Exactly_Once,具体原理如下:
kafka-connector 在初始化时会主动向 Selectdb 获取当前所在 partition 已提交的 last_offset。
从 kafka 消费数据,只有当前 record 的 offset 大于从 Selectdb 获取的 last_offset 后,才能被正常消费。当消费的 record 达到阈值,会生成一个以 last_offset 命名的临时文件,并将该文件上传至对象存储中。
在 kafka 调用执行 preCommit 时,会将对象存储中的数据由 copy-into 操作导入至 SelectDB 中,此时 SelectDB会记录已提交成功的 last_offset。
若此时 Kafka-connector 执行 copy-into 失败,则会从 Kafka 中获取当前 partition 上一次执行成功的 offset,继续消费,从而保证数据不丢不重。
成功执行 copy-into 后,向 kafka 提交记录当前 partition 已成功消费的 offset。
若此时 kafka-connector 意外挂掉,重启该 task 或其他 task 在 kafka 的分区自平衡机制下继续消费该 partition。通过初始化阶段可获取到 SelectDB 中已提交成功的 last_offset,继续消费,直至下一个 preCommit 阶段再向 kafka 提交成功消费的 offset。
使用场景
环境准备
下载并解压
wget https://archive.apache.org/dist/kafka/2.4.0/kafka_2.12-2.4.0.tgz tar -zxvf kafka_2.12-2.4.0.tgz bin/zookeeper-server-start.sh -daemon config/zookeeper.properties bin/kafka-server-start.sh -daemon config/server.properties
快速同步JSON数据
在业务场景中,Kafka中会存放业务写入的数据流,通常格式为JSON(对象/数组),使用SelectDB Sink Connector可以快速的同步数据到SelectDB数据库中。
配置SelectDB Sink
name=selectdb-sink connector.class=com.selectdb.kafka.connector.SelectdbSinkConnector topics=test_topic selectdb.topic2table.map=test_topic:test_tbl buffer.count.records=10000 buffer.flush.time=60 buffer.size.bytes=5000000 selectdb.url=xxx.cn-beijing.privatelink.aliyuncs.com selectdb.http.port=47057 selectdb.query.port=30523 selectdb.user=admin selectdb.password=password selectdb.database=test_db selectdb.cluster=cluster_name #配置convert key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false #配置死信队列,可选 errors.tolerance=all errors.deadletterqueue.topic.name=test_error errors.deadletterqueue.context.headers.enable = true errors.deadletterqueue.topic.replication.factor=1
启动Kafka Connect
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/selectdb-sink.properties
使用Debezium数据同步MySQL数据到SelectDB
在很多业务场景中,经常需要从业务数据库中实时同步数据,在这个时候就需要使用数据库的变更数据捕获(Change Data Capture,简称 CDC)机制。
而Debezium是基于Kafka Connect的CDC工具,可以对接 MySQL、PostgreSQL、SQL Server、Oracle、MongoDB 等多种数据库,把数据库的数据持续以统一的格式发送到 Kafka 的Topic中,以供下游Sink端进行实时消费。
这里以MySQL为例
下载Debezium
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.5.4.Final/debezium-connector-mysql-1.5.4.Final-plugin.tar.gz tar -zxvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
配置Debezium Source
name=mysql-source connector.class=io.debezium.connector.mysql.MySqlConnector database.hostname=127.0.0.1 database.port=3306 database.user=root database.password=123456 database.server.id=1 # kafka中的该client的唯一标识 database.server.name=test #需要同步的数据库,默认是同步所有数据库 database.include.list=db database.history.kafka.bootstrap.servers=localhost:9092 #用于存储数据库表结构变化的 Kafka topic database.history.kafka.topic=dbhistory transforms=unwrap #参考 https://debezium.io/documentation/reference/stable/transformations/event-flattening.html transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState #记录删除事件 transforms.unwrap.delete.handling.mode=rewrite
配置好之后,Kafka中默认的Topic名称格式是 SERVER_NAME.DATABASE_NAME.TABLE_NAME
注:其他Debezium配置可参考
https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-connector-properties
配置SelectDB Sink
name=selectdb-sink connector.class=com.selectdb.kafka.connector.SelectdbSinkConnector topics=test.db.table selectdb.topic2table.map=test.db.table:test_tbl buffer.count.records=10000 buffer.flush.time=60 buffer.size.bytes=5000000 selectdb.url=xxx.cn-beijing.privatelink.aliyuncs.com selectdb.http.port=57338 selectdb.query.port=15392 selectdb.user=admin selectdb.password=password selectdb.database=test selectdb.cluster=cluster_name #配置convert key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true #配置死信队列,可选 errors.tolerance=all errors.deadletterqueue.topic.name=test_error errors.deadletterqueue.context.headers.enable = true errors.deadletterqueue.topic.replication.factor=1
同步到SelectDB时,需要先提前创建好库表。
启动Kafka Connect
bin/connect-standalone.sh -daemon config/connect-standalone.properties config/mysql-source.properties config/selectdb-sink.properties
启动后,可以观察日志 logs/connect.log 是否启动成功。
使用效果
在调研的使用场景中,使用 kafka 同步上游 JSON 数据。这里数据维持以每秒 10w 条的超高频导入,在 8c16g 的机器上,仅部署单节点 kafka 集群,同时在 topic 中配置 20 个 partition,以 distributed 模式启动 connector。在实际处理过程中,topic 中的总体消息平均积压在 120w 条左右,单个 partition 积压 6w 条消息,表现相当优秀。
总结
整体来看,Kafka-SelectDB Connector 打通了从 kafka 直接导入数据至 SelectDB 的数据链路,降低了通过 Flink 作为中间数据同步组件的链路复杂度;通过 Exactly once 实现数据的一次性精确导入,确保了数据的准确性;通过以 Kafka 集群作为载体,在超高频的数据导入场景中,性能表现非常优秀。