通过流处理平台Kafka与云原生数据仓库PostgreSQL做实时数据交互

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: 本文介绍如何基于流处理平台Kafka与云原生数据仓库AnalyticDB PostgreSQL做实时数据交互

一、概述

Apache Kafka是一种开源流数据处理平台,因为其部署简单、性能良好的特性得到广泛应用。本文介绍基于Apache Kafka平台将按约定格式与云原生数据仓库PostgreSQL版做实时数据交互,同步数据至云原生数据仓库PostgreSQL版(以下简称ADBPG)的链路。
本文内容安排如下:第二章“背景知识”会介绍本链路中组件的基础知识以及一些名词解释,第三章“原理与架构”会介绍链路的基本架构以及工作原理,第四章“开发入门”会介绍实际的搭建部署步骤,第五章“格式约定”会介绍通过本链路写入的数据格式要求,第六章“参数配置”会给出本链路涉及的一些参数定义并给出推荐的配置值,第七章介绍本链路的可用性和错误恢复情况,第八章介绍本链路的exactly once保证情况。

二、背景知识

1)Kafka

Kafka是一种开源流处理平台,是Apache社区的顶级项目之一,在流处理领域得到广泛应用。本链路基于Kafka平台实现到ADBPG的数据同步,使用时用户将数据写入Kafka topic,本链路会按配置文件消费对应topic内的数据,并将数据同步至ADBPG。

2)Zookeeper

Zookeeper是Apache社区的一套开源分布式服务框架,Kafka集群依赖Zookeeper存储消息队列的状态信息。

3)Kafka connect

Kafka connect是kafka社区维护的一套用于数据数据同步的框架(注意是同步,如果做etl更应该使用flink或spark),用于同步像关系数据库到HDFS这样的链路。Kafka Connect基于Kafka消息队列存储状态信息,并支持故障恢复和task重分布,并不直接依赖于Zookeeper。

3-1)Connectors

Connector分为Source端和Sink端,通过将"Connectors"拼接可以定义数据从哪里读出或者写入哪里。一个source connector可以看做是将外部源数据读入特定kafka topic的接入器,一个sink connector可以看做是将kafka topic数据导出到外部的导出器。
kafka connect还包含kafka instance的概念,一个"connector instance"是负责管理Kafka和其他系统之间的进行数据复制的逻辑概念(更像是其他框架的job概念)。所有这些用于读取或写入的实现都被定义为“connector插件”。"connector instance"和"connector插件"都可以被称为"connectors"。

3-2)Task

Tasks是Connect数据模型中的负责数据拷贝的主要概念。每个Connector instance都是由一系列tasks组成。有了Task概念,kafka connect可以将一个connector拆分成多个task来并行化执行。Task之间无状态,task执行的中间状态存储在特定的topics里面(config.storage.topic和status.storage.topic)。任何时候有task被start、stop或restart,通过这种方式来提供弹性,可扩展的数据pipeline。
5.png
注意kafka connect可以用分布式和standalone两种方式部署。如果是standalone模式,数据被存储在本地磁盘,反之则存储在kafka topic内(用于控制已消费的状态用于rebalance的status.storage.topic,和存储connecotor配置的status.storage.topic)。

3-3)Worker

Connector和task都是逻辑单位,必须被调度在一个Kafka connect worker进程内。有两种部署方式,standalone和distributed。因为最终是用于生产环境,我们只讨论distributed模式。
和其他分布式框架一样,Distributed模式为kafka connect提供了扩展性和错误容忍的特性支持。已经启动的worker使用group.id配置来代表启动的worker是在同一个集群上。如果向集群中添加了worker或worker挂掉,其他的worker会感知到并且自动发起投票,将task重新分配其他可用的机器上。

三、原理与架构

6.png
基于Kafka将数据同步至ADBPG的链路工作流程如上图。
源端通过各种链路将数据写入某个Kafka Topic后,本链路对应启动Adbpg Sink Connector按配置文件的配置读取对应的Kafka Topic的数据。SinkTask会消费对应topic内的数据,更新存储于Kafka内的消费位点记录(offset),解析对应的数据并转化为JDBC操作发送到云原生数据仓库ADBPG。
本链路不会对源端的写入方式和源端类别进行限制,只对源端写入Kafka的数据格式进行约定(详见第五章),用户可以直接将文件或者单条消息通过管道丢入Kafka Topic,也可以通过开源社区(比如DebeziumConfluent)丰富的source connector资源,从MySQL/PostgreSQL/SQLServer等数据库实时同步数据。

四、开发入门

0)版本要求

组件 版本要求 推荐版本
Java 8+ 8
Zookeeper 3.4+ 3.4.14
Kafka kafka_2.11-2.1.1+ kafka_2.11-2.1.1
Kakfa connect 一般采用kafka内置组件,与kafka保持一致即可 一般采用kafka内置组件,与kafka保持一致即可
ADBPG 6.0 6.0

1)安装Java并配置环境变量

首先需要去官网下载jdk安装包并安装到对应平台,并配置好JAVA_HOME环境变量。安装完毕后可以通过输入java -version检查是否已经配置好java环境。
7.png

2)安装zookeeper

需要去zookeeper官网下载安装包并对应安装配置。
安装完毕后可以执行path_to_zookeeper/bin/zkServer.sh status 检查zookeeper是否正常运行

3)安装kafka集群

需要去官网下载kafka安装包,并对应安装配置。安装完毕后,可以执行命令查看那kafka集群当前topic列表。
path_to_zookeeper/bin/kafka-topics.sh --list --zookeeper localhost:2181

4)安装kafka connect jar包

下载jar包,并放置入path_to_kafka/plugin_jars路径下。

5)创建ADBPG集群及目标表

5-1)开通ADBPG实例

如果尚未开通ADBPG实例,请参照以下文档开通6.0版本ADBPG实例(4.3版本不支持),并创建初始账号:
https://help.aliyun.com/document_detail/50200.html?spm=a2c4g.11186623.6.557.133412bfHEMkSR
为了达到最佳的写入性能和安全性,建议kafka集群与要写入的ADB PG实例位于同一region、同一VPC下,通过内网通信。

5-2)设置ADBPG实例白名单

ADBPG实例需要设置白名单保证Kafka集群能够访问ADBPG实例:在ADBPG控制台点击进入所创建的ADBPG实例,点击数据安全性-添加白名单分组。
8.png
将对应的网段添加进ADBPG实例白名单,点击确定。

5-3)连接ADBPG实例并创建要写入的目标表

执行写入任务前需要在ADB PG实例中创建要写入的目标表,这里给出一个例子:

create table test15(                  
b1 bigint,
b2 smallint,
b3 smallint,
b4 int,
b5 boolean,
b6 real,                      
b7 double precision,          
b8 double precision,          
b9 date,
b10 time with time zone,
b11 timestamp with time zone,
b12 text,
b15 json
);

6)启动kafka connect service

参考官网文档配置kafka conenct service并启动,这里提供命令参考:
nohup bin/connect-distributed.sh config/connect-distributed.properties >> connect.log
启动过程中,kafka connect service会不断打印出现有plugin的加载过程,以及一些全局参数配置:
9.png
Kafka connect启动时需要提供配置文件,这里提供一份配置的demo,各个参数的详细含义见第六章。

vim path_to_kafka/config/connect-distributed.properties
group.id=connect-cluster
# The name of the Kafka topic where connector offsets are stored
offset.storage.topic=connect-offsets
# Replication factor used when creating the offset storage topic
offset.storage.replication.factor=1
# Interval at which to try committing offsets for tasks.
offset.flush.interval.ms=10000
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. 
bootstrap.servers=localhost:9092
# List of paths separated by commas (,) that contain plugins 
plugin.path=path_to_kafka/plugin_jars
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
config.storage.topic=config-storage
status.storage.topic=config-status
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1

7)启动adbpg sink connector
配置json格式的adbpg sink connector配置文件,并参考官方文档向kafka connect提交adbpg sink任务,这里提供命令参考:
配置文档实例如下,全部参数配置请参考第六章。

{
     "name":"adb4pg-jdbc-sink",
     "config": {
        "name":"adb4pg-jdbc-sink",
        "topics":"server1.dbo.t1",
        "connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",
        "connection.url":"jdbc:postgresql://yourinstance-url:port/yourdbname",
        "connection.user":"yourname",
        "connection.password":"******",
        "col.names":"a,b,c,d,e,f,g",
        "col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",
        "pk.fields":"a",
        "schemas.enable" : "false",
        "target.tablename":"t1",
        "tasks.max":"1",
        "auto.create":"false",
        "table.name.format":"t1",
        "batch.size":"1",
        "value.converter.schemas.enable":"false",
        "value.converter":"org.apache.kafka.connect.json.JsonConverter"
        }
}

启动adbpg sink connector:

#查看当前集群kafka connector列表:
curl 127.0.0.1:8083/connectors

#提交配置文件(这里以adb4pg-sink.json为例),并启动adbpg sink connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-sink.json 

#删除connectors
curl -s -X DELETE http://localhost:8083/connectors/adb4pg-sink

#查看connector状态
curl http://localhost:8083/connectors/adb4pg-jdbc-sink/status

10.png

五、格式约定

本链路支持对目标库进行三种操作:insert、delete和update,对于单条操作对应的kafka消息的格式要求为json,json内各个字段的数据格式要求如下。

操作类别 op字段 before字段 after字段 demo
insert c 无限制 要插入记录列名与取值对应的取值对应的字典 {"before": null,"after": {"id": 1,"name": "44"},"op": "c"}
delete d 要删除记录列名与取值对应的取值对应的字典 无限制 {"after": null,"before": {"id": 1,"name": "44"},"op": "d"}
update u 所要更新的原始数据的列名与取值对应的取值对应的字典 更新后记录列名与取值对应的取值对应的字典 {"before": {"id":2},"after": {"id": 1,"name": "44"},"op": "u"}

对于某个topic内含有如下消息数据:

{"before": null,"after": {"id": 1,"name": "44"},"op": "c","transaction": null}
{"before": {"id":2},"after": {"id": 1,"name": "44"},"op": "u","transaction": null}
{"after": null,"before": {"id": 1,"name": "44"},"op": "d","transaction": null}

对应会产生的同步操作为:本链路对应会向目标库表插入一条id=1,name='44' 的数据;然后按id=2更新数据,将id更新为1,name更新为'44';最后会向目标表删除id=1,name='44'的数据。

六、参数配置

1)zookeeper参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考zookeeper官方文档

参数名 参数含义 推荐值
dataDir 数据目录
dataLogDir 日志目录
clientPort 端口 2181
server.x 各个节点配置 与实际的机器相符

2) kafka参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考kafka官方文档。

参数名 参数含义 推荐值
broker.id 各个broker的唯一id
log.dirs 日志路径
offsets.topic.replication.factor 副本数 3
host.name 当前机器
host 与实际相符
port 端口 9092
zookeeper.connect 所连接的zk地址 与实际的zk地址相符
auto.create.topics.enable 是否允许自动创建topic true
log.retention.hours topic内数据过期时间,单位h 72
log.cleanup.policy topic内过期数据清理策略 delete,即为删除

3)kafka connect参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考kafka connect官方文档。

参数名 参数含义 推荐值
group.id 集群id,kafka connect集群上所有节点取值必须统一,标志属于同一集群
offset.storage.topic 记录kafka-connect位点的topic名称
offset.storage.replication.factor kafka connect相关topic副本数 3
offset.flush.interval.ms kafka connect flush间隔,单位ms 10000
bootstrap.servers 连接到的kafka集群地址 与实际集群相符
plugin.path kafka connector jar包路径

4)adbpg sink connector参数配置

adbpg sink connector支持参数配置如下:

参数名 参数含义 默认值 推荐值
name kafka connector命名
topics 源数据所在topic 与源数据所在topic相符
connector.class connector所对应的实现类 必须写io.confluent.connect.jdbc.Adb4PgSinkConnector
connector.url 目标ADBPG库的jdbc地址
connector.user ADBPG用户名
connector.password ADBPG密码
col.names 写入ADBPG目标表列名
col.types 写入ADBPG目标表列类型
pk.fields 主键列表,以逗号分割
tasks.max 启动线程数 1 1
target.tablename 写入ADBPG目标表名
writeMode 写入方式,取值为insert或upsert insert insert

七、可用性和错误恢复

1)高可用

组件 是否支持高可用 故障切换方式 依赖情况
zookeeper leader挂掉会根据paxos协议重新选主,单个follower挂掉不影响可用性。 不依赖其他组件
kafka 写入多副本,单个broker挂掉不影响可用性。 kafka消息队列会向zk写入状态量,zk不可用影响kafka集群可用性;
kafka connect 由kafka集群维护kafka connect的可用性,kafka connect挂掉之后会触发task rebalance,进行task迁移,单个kafka connect挂掉不影响可用性。 kafka connect会向kafka topic写入消费到的位点(offset),kafka集群不可用影响kafka connect可用性;kafka connect不会向zk写入数据。

2)错误恢复

kafka/kafka connect/zookeeper三个服务组件均支持高可用,三副本配置下,挂掉任意一个或两个节点均可自动恢复任务进度。若同组件三个节点同时挂掉,通常为硬件或网络故障,需要按具体情况排查。

3)Task rebalancing

为了“负载均衡”每个worker上面的task数量和机器负载,kafka connect也有一个rebalance的概念,负责在connecotor启动时,有worker挂掉时,将task迁移到更空闲的worker上。但是要注意,如果是task fail,不会触发rebalance,这种时候通常是代码或数据出现了异常需要人工介入处理。
11.png

八、幂等性/exactly once保证

在目标表配置主键,并且配置writeMode='upsert'的场景下,本链路能够保证exactly-once;在其他场景下,本链路保证at-least-once.
kafka connector task发生失败,kafka自身提供的offset机制进行recovery通常只能恢复到最近一次提交的offset,而不是失败时的状态。upsert模式可以避免有主键的表约束冲突或数据重复的情况。

目录
相关文章
|
1月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
83 5
|
1月前
|
机器学习/深度学习 存储 SQL
数据仓库革新:Snowflake在云数据平台中的创新实践
【10月更文挑战第27天】Snowflake作为云原生数据仓库的领导者,以其多租户、事务性、安全的特性,支持高度可扩展性和弹性,全面兼容SQL及多种数据类型。本文探讨了Snowflake在现代化数据仓库迁移、实时数据分析、数据存储与管理及机器学习集成等领域的创新实践和应用案例,展示了其在云数据平台中的强大优势和未来潜力。
56 2
|
1月前
|
存储 运维 Cloud Native
数据仓库革新:Snowflake在云数据平台中的创新实践
【10月更文挑战第26天】随着大数据时代的到来,数据仓库正经历重大变革。本文探讨了Snowflake在云数据平台中的创新应用,通过弹性扩展、高性能查询、数据安全、多数据源接入和云原生架构等最佳实践,展示了其独特优势,帮助企业提升数据处理和分析效率,保障数据安全,降低运维成本,推动业务快速发展。
64 2
|
2月前
|
人工智能 自然语言处理 关系型数据库
阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成
近日,阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成。
|
6月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
1022 0
|
7月前
|
Cloud Native 关系型数据库 OLAP
云原生数据仓库产品使用合集之阿里云云原生数据仓库AnalyticDB PostgreSQL版的重分布时间主要取决的是什么
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
|
7月前
|
运维 Cloud Native 关系型数据库
云原生数据仓库产品使用合集之原生数据仓库AnalyticDB PostgreSQL版如果是列存表的话, adb支持通过根据某个字段做upsert吗
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
|
4月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
87 8
|
6月前
|
运维 Cloud Native 关系型数据库
云原生数据仓库AnalyticDB产品使用合集之PostgreSQL版是否直接支持实时物化视图
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
135 3
下一篇
DataWorks