通过流处理平台Kafka与云原生数据仓库PostgreSQL做实时数据交互-阿里云开发者社区

开发者社区> 数据库> 正文

通过流处理平台Kafka与云原生数据仓库PostgreSQL做实时数据交互

简介: 本文介绍如何基于流处理平台Kafka与云原生数据仓库AnalyticDB PostgreSQL做实时数据交互

一、概述

Apache Kafka是一种开源流数据处理平台,因为其部署简单、性能良好的特性得到广泛应用。本文介绍基于Apache Kafka平台将按约定格式与云原生数据仓库PostgreSQL版做实时数据交互,同步数据至云原生数据仓库PostgreSQL版(以下简称ADBPG)的链路。
本文内容安排如下:第二章“背景知识”会介绍本链路中组件的基础知识以及一些名词解释,第三章“原理与架构”会介绍链路的基本架构以及工作原理,第四章“开发入门”会介绍实际的搭建部署步骤,第五章“格式约定”会介绍通过本链路写入的数据格式要求,第六章“参数配置”会给出本链路涉及的一些参数定义并给出推荐的配置值,第七章介绍本链路的可用性和错误恢复情况,第八章介绍本链路的exactly once保证情况。

二、背景知识

1)Kafka

Kafka是一种开源流处理平台,是Apache社区的顶级项目之一,在流处理领域得到广泛应用。本链路基于Kafka平台实现到ADBPG的数据同步,使用时用户将数据写入Kafka topic,本链路会按配置文件消费对应topic内的数据,并将数据同步至ADBPG。

2)Zookeeper

Zookeeper是Apache社区的一套开源分布式服务框架,Kafka集群依赖Zookeeper存储消息队列的状态信息。

3)Kafka connect

Kafka connect是kafka社区维护的一套用于数据数据同步的框架(注意是同步,如果做etl更应该使用flink或spark),用于同步像关系数据库到HDFS这样的链路。Kafka Connect基于Kafka消息队列存储状态信息,并支持故障恢复和task重分布,并不直接依赖于Zookeeper。

3-1)Connectors

Connector分为Source端和Sink端,通过将"Connectors"拼接可以定义数据从哪里读出或者写入哪里。一个source connector可以看做是将外部源数据读入特定kafka topic的接入器,一个sink connector可以看做是将kafka topic数据导出到外部的导出器。
kafka connect还包含kafka instance的概念,一个"connector instance"是负责管理Kafka和其他系统之间的进行数据复制的逻辑概念(更像是其他框架的job概念)。所有这些用于读取或写入的实现都被定义为“connector插件”。"connector instance"和"connector插件"都可以被称为"connectors"。

3-2)Task

Tasks是Connect数据模型中的负责数据拷贝的主要概念。每个Connector instance都是由一系列tasks组成。有了Task概念,kafka connect可以将一个connector拆分成多个task来并行化执行。Task之间无状态,task执行的中间状态存储在特定的topics里面(config.storage.topic和status.storage.topic)。任何时候有task被start、stop或restart,通过这种方式来提供弹性,可扩展的数据pipeline。
5.png
注意kafka connect可以用分布式和standalone两种方式部署。如果是standalone模式,数据被存储在本地磁盘,反之则存储在kafka topic内(用于控制已消费的状态用于rebalance的status.storage.topic,和存储connecotor配置的status.storage.topic)。

3-3)Worker

Connector和task都是逻辑单位,必须被调度在一个Kafka connect worker进程内。有两种部署方式,standalone和distributed。因为最终是用于生产环境,我们只讨论distributed模式。
和其他分布式框架一样,Distributed模式为kafka connect提供了扩展性和错误容忍的特性支持。已经启动的worker使用group.id配置来代表启动的worker是在同一个集群上。如果向集群中添加了worker或worker挂掉,其他的worker会感知到并且自动发起投票,将task重新分配其他可用的机器上。

三、原理与架构

6.png
基于Kafka将数据同步至ADBPG的链路工作流程如上图。
源端通过各种链路将数据写入某个Kafka Topic后,本链路对应启动Adbpg Sink Connector按配置文件的配置读取对应的Kafka Topic的数据。SinkTask会消费对应topic内的数据,更新存储于Kafka内的消费位点记录(offset),解析对应的数据并转化为JDBC操作发送到云原生数据仓库ADBPG。
本链路不会对源端的写入方式和源端类别进行限制,只对源端写入Kafka的数据格式进行约定(详见第五章),用户可以直接将文件或者单条消息通过管道丢入Kafka Topic,也可以通过开源社区(比如DebeziumConfluent)丰富的source connector资源,从MySQL/PostgreSQL/SQLServer等数据库实时同步数据。

四、开发入门

0)版本要求

组件 版本要求 推荐版本
Java 8+ 8
Zookeeper 3.4+ 3.4.14
Kafka kafka_2.11-2.1.1+ kafka_2.11-2.1.1
Kakfa connect 一般采用kafka内置组件,与kafka保持一致即可 一般采用kafka内置组件,与kafka保持一致即可
ADBPG 6.0 6.0

1)安装Java并配置环境变量

首先需要去官网下载jdk安装包并安装到对应平台,并配置好JAVA_HOME环境变量。安装完毕后可以通过输入java -version检查是否已经配置好java环境。
7.png

2)安装zookeeper

需要去zookeeper官网下载安装包并对应安装配置。
安装完毕后可以执行path_to_zookeeper/bin/zkServer.sh status 检查zookeeper是否正常运行

3)安装kafka集群

需要去官网下载kafka安装包,并对应安装配置。安装完毕后,可以执行命令查看那kafka集群当前topic列表。
path_to_zookeeper/bin/kafka-topics.sh --list --zookeeper localhost:2181
这里提供一个参考安装包,可用于核对版本以及参考配置
https://adbpg-public.oss-cn-beijing.aliyuncs.com/kafka2_1_1.tar.gz

4)安装kafka connect jar包

下载jar包,并放置入path_to_kafka/plugin_jars路径下。

下载地址:https://adbpg-public.oss-cn-beijing.aliyuncs.com/kafka-connect-jdbc-5.3.1-jar-with-dependencies.jar

5)创建ADBPG集群及目标表

5-1)开通ADBPG实例

如果尚未开通ADBPG实例,请参照以下文档开通6.0版本ADBPG实例(4.3版本不支持),并创建初始账号:
https://help.aliyun.com/document_detail/50200.html?spm=a2c4g.11186623.6.557.133412bfHEMkSR
为了达到最佳的写入性能和安全性,建议kafka集群与要写入的ADB PG实例位于同一region、同一VPC下,通过内网通信。

5-2)设置ADBPG实例白名单

ADBPG实例需要设置白名单保证Kafka集群能够访问ADBPG实例:在ADBPG控制台点击进入所创建的ADBPG实例,点击数据安全性-添加白名单分组。
8.png
将对应的网段添加进ADBPG实例白名单,点击确定。

5-3)连接ADBPG实例并创建要写入的目标表

执行写入任务前需要在ADB PG实例中创建要写入的目标表,这里给出一个例子:

create table test15(                  
b1 bigint,
b2 smallint,
b3 smallint,
b4 int,
b5 boolean,
b6 real,                      
b7 double precision,          
b8 double precision,          
b9 date,
b10 time with time zone,
b11 timestamp with time zone,
b12 text,
b15 json
);

6)启动kafka connect service

参考官网文档配置kafka conenct service并启动,这里提供命令参考:
nohup bin/connect-distributed.sh config/connect-distributed.properties >> connect.log
启动过程中,kafka connect service会不断打印出现有plugin的加载过程,以及一些全局参数配置:
9.png
Kafka connect启动时需要提供配置文件,这里提供一份配置的demo,各个参数的详细含义见第六章。

vim path_to_kafka/config/connect-distributed.properties
group.id=connect-cluster
# The name of the Kafka topic where connector offsets are stored
offset.storage.topic=connect-offsets
# Replication factor used when creating the offset storage topic
offset.storage.replication.factor=1
# Interval at which to try committing offsets for tasks.
offset.flush.interval.ms=10000
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. 
bootstrap.servers=localhost:9092
# List of paths separated by commas (,) that contain plugins 
plugin.path=path_to_kafka/plugin_jars
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
config.storage.topic=config-storage
status.storage.topic=config-status
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1

7)启动adbpg sink connector
配置json格式的adbpg sink connector配置文件,并参考官方文档向kafka connect提交adbpg sink任务,这里提供命令参考:
配置文档实例如下,全部参数配置请参考第六章。

{
     "name":"adb4pg-jdbc-sink",
     "config": {
        "name":"adb4pg-jdbc-sink",
        "topics":"server1.dbo.t1",
        "connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",
        "connection.url":"jdbc:postgresql://yourinstance-url:port/yourdbname",
        "connection.user":"yourname",
        "connection.password":"******",
        "col.names":"a,b,c,d,e,f,g",
        "col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",
        "pk.fields":"a",
        "schemas.enable" : "false",
        "target.tablename":"t1",
        "tasks.max":"1",
        "auto.create":"false",
        "table.name.format":"t1",
        "batch.size":"1",
        "value.converter.schemas.enable":"false",
        "value.converter":"org.apache.kafka.connect.json.JsonConverter"
        }
}

启动adbpg sink connector:

#查看当前集群kafka connector列表:
curl 127.0.0.1:8083/connectors

#提交配置文件(这里以adb4pg-sink.json为例),并启动adbpg sink connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-sink.json 

#删除connectors
curl -s -X DELETE http://localhost:8083/connectors/adb4pg-sink

#查看connector状态
curl http://localhost:8083/connectors/adb4pg-jdbc-sink/status

10.png

五、格式约定

本链路支持对目标库进行三种操作:insert、delete和update,对于单条操作对应的kafka消息的格式要求为json,json内各个字段的数据格式要求如下。

操作类别 op字段 before字段 after字段 demo
insert c 无限制 要插入记录列名与取值对应的取值对应的字典 {"before": null,"after": {"id": 1,"name": "44"},"op": "c"}
delete d 要删除记录列名与取值对应的取值对应的字典 无限制 {"after": null,"before": {"id": 1,"name": "44"},"op": "d"}
update u 所要更新的原始数据的列名与取值对应的取值对应的字典 更新后记录列名与取值对应的取值对应的字典 {"before": {"id":2},"after": {"id": 1,"name": "44"},"op": "u"}

对于某个topic内含有如下消息数据:

{"before": null,"after": {"id": 1,"name": "44"},"op": "c","transaction": null}
{"before": {"id":2},"after": {"id": 1,"name": "44"},"op": "u","transaction": null}
{"after": null,"before": {"id": 1,"name": "44"},"op": "d","transaction": null}

对应会产生的同步操作为:本链路对应会向目标库表插入一条id=1,name='44' 的数据;然后按id=2更新数据,将id更新为1,name更新为'44';最后会向目标表删除id=1,name='44'的数据。

六、参数配置

1)zookeeper参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考zookeeper官方文档

参数名 参数含义 推荐值
dataDir 数据目录
dataLogDir 日志目录
clientPort 端口 2181
server.x 各个节点配置 与实际的机器相符

2) kafka参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考kafka官方文档。

参数名 参数含义 推荐值
broker.id 各个broker的唯一id
log.dirs 日志路径
offsets.topic.replication.factor 副本数 3
host.name 当前机器
host 与实际相符
port 端口 9092
zookeeper.connect 所连接的zk地址 与实际的zk地址相符
auto.create.topics.enable 是否允许自动创建topic true
log.retention.hours topic内数据过期时间,单位h 72
log.cleanup.policy topic内过期数据清理策略 delete,即为删除

3)kafka connect参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考kafka connect官方文档。

参数名 参数含义 推荐值
group.id 集群id,kafka connect集群上所有节点取值必须统一,标志属于同一集群
offset.storage.topic 记录kafka-connect位点的topic名称
offset.storage.replication.factor kafka connect相关topic副本数 3
offset.flush.interval.ms kafka connect flush间隔,单位ms 10000
bootstrap.servers 连接到的kafka集群地址 与实际集群相符
plugin.path kafka connector jar包路径

4)adbpg sink connector参数配置

adbpg sink connector支持参数配置如下:

参数名 参数含义 默认值 推荐值
name kafka connector命名
topics 源数据所在topic 与源数据所在topic相符
connector.class connector所对应的实现类 必须写io.confluent.connect.jdbc.Adb4PgSinkConnector
connector.url 目标ADBPG库的jdbc地址
connector.user ADBPG用户名
connector.password ADBPG密码
col.names 写入ADBPG目标表列名
col.types 写入ADBPG目标表列类型
pk.fields 主键列表,以逗号分割
tasks.max 启动线程数 1 1
target.tablename 写入ADBPG目标表名
writeMode 写入方式,取值为insert或upsert insert insert

七、可用性和错误恢复

1)高可用

组件 是否支持高可用 故障切换方式 依赖情况
zookeeper leader挂掉会根据paxos协议重新选主,单个follower挂掉不影响可用性。 不依赖其他组件
kafka 写入多副本,单个broker挂掉不影响可用性。 kafka消息队列会向zk写入状态量,zk不可用影响kafka集群可用性;
kafka connect 由kafka集群维护kafka connect的可用性,kafka connect挂掉之后会触发task rebalance,进行task迁移,单个kafka connect挂掉不影响可用性。 kafka connect会向kafka topic写入消费到的位点(offset),kafka集群不可用影响kafka connect可用性;kafka connect不会向zk写入数据。

2)错误恢复

kafka/kafka connect/zookeeper三个服务组件均支持高可用,三副本配置下,挂掉任意一个或两个节点均可自动恢复任务进度。若同组件三个节点同时挂掉,通常为硬件或网络故障,需要按具体情况排查。

3)Task rebalancing

为了“负载均衡”每个worker上面的task数量和机器负载,kafka connect也有一个rebalance的概念,负责在connecotor启动时,有worker挂掉时,将task迁移到更空闲的worker上。但是要注意,如果是task fail,不会触发rebalance,这种时候通常是代码或数据出现了异常需要人工介入处理。
11.png

八、幂等性/exactly once保证

在目标表配置主键,并且配置writeMode='upsert'的场景下,本链路能够保证exactly-once;在其他场景下,本链路保证at-least-once.
kafka connector task发生失败,kafka自身提供的offset机制进行recovery通常只能恢复到最近一次提交的offset,而不是失败时的状态。upsert模式可以避免有主键的表约束冲突或数据重复的情况。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

其他文章