通过流处理平台Kafka与云原生数据仓库PostgreSQL做实时数据交互

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 本文介绍如何基于流处理平台Kafka与云原生数据仓库AnalyticDB PostgreSQL做实时数据交互

一、概述

Apache Kafka是一种开源流数据处理平台,因为其部署简单、性能良好的特性得到广泛应用。本文介绍基于Apache Kafka平台将按约定格式与云原生数据仓库PostgreSQL版做实时数据交互,同步数据至云原生数据仓库PostgreSQL版(以下简称ADBPG)的链路。
本文内容安排如下:第二章“背景知识”会介绍本链路中组件的基础知识以及一些名词解释,第三章“原理与架构”会介绍链路的基本架构以及工作原理,第四章“开发入门”会介绍实际的搭建部署步骤,第五章“格式约定”会介绍通过本链路写入的数据格式要求,第六章“参数配置”会给出本链路涉及的一些参数定义并给出推荐的配置值,第七章介绍本链路的可用性和错误恢复情况,第八章介绍本链路的exactly once保证情况。

二、背景知识

1)Kafka

Kafka是一种开源流处理平台,是Apache社区的顶级项目之一,在流处理领域得到广泛应用。本链路基于Kafka平台实现到ADBPG的数据同步,使用时用户将数据写入Kafka topic,本链路会按配置文件消费对应topic内的数据,并将数据同步至ADBPG。

2)Zookeeper

Zookeeper是Apache社区的一套开源分布式服务框架,Kafka集群依赖Zookeeper存储消息队列的状态信息。

3)Kafka connect

Kafka connect是kafka社区维护的一套用于数据数据同步的框架(注意是同步,如果做etl更应该使用flink或spark),用于同步像关系数据库到HDFS这样的链路。Kafka Connect基于Kafka消息队列存储状态信息,并支持故障恢复和task重分布,并不直接依赖于Zookeeper。

3-1)Connectors

Connector分为Source端和Sink端,通过将"Connectors"拼接可以定义数据从哪里读出或者写入哪里。一个source connector可以看做是将外部源数据读入特定kafka topic的接入器,一个sink connector可以看做是将kafka topic数据导出到外部的导出器。
kafka connect还包含kafka instance的概念,一个"connector instance"是负责管理Kafka和其他系统之间的进行数据复制的逻辑概念(更像是其他框架的job概念)。所有这些用于读取或写入的实现都被定义为“connector插件”。"connector instance"和"connector插件"都可以被称为"connectors"。

3-2)Task

Tasks是Connect数据模型中的负责数据拷贝的主要概念。每个Connector instance都是由一系列tasks组成。有了Task概念,kafka connect可以将一个connector拆分成多个task来并行化执行。Task之间无状态,task执行的中间状态存储在特定的topics里面(config.storage.topic和status.storage.topic)。任何时候有task被start、stop或restart,通过这种方式来提供弹性,可扩展的数据pipeline。
5.png
注意kafka connect可以用分布式和standalone两种方式部署。如果是standalone模式,数据被存储在本地磁盘,反之则存储在kafka topic内(用于控制已消费的状态用于rebalance的status.storage.topic,和存储connecotor配置的status.storage.topic)。

3-3)Worker

Connector和task都是逻辑单位,必须被调度在一个Kafka connect worker进程内。有两种部署方式,standalone和distributed。因为最终是用于生产环境,我们只讨论distributed模式。
和其他分布式框架一样,Distributed模式为kafka connect提供了扩展性和错误容忍的特性支持。已经启动的worker使用group.id配置来代表启动的worker是在同一个集群上。如果向集群中添加了worker或worker挂掉,其他的worker会感知到并且自动发起投票,将task重新分配其他可用的机器上。

三、原理与架构

6.png
基于Kafka将数据同步至ADBPG的链路工作流程如上图。
源端通过各种链路将数据写入某个Kafka Topic后,本链路对应启动Adbpg Sink Connector按配置文件的配置读取对应的Kafka Topic的数据。SinkTask会消费对应topic内的数据,更新存储于Kafka内的消费位点记录(offset),解析对应的数据并转化为JDBC操作发送到云原生数据仓库ADBPG。
本链路不会对源端的写入方式和源端类别进行限制,只对源端写入Kafka的数据格式进行约定(详见第五章),用户可以直接将文件或者单条消息通过管道丢入Kafka Topic,也可以通过开源社区(比如DebeziumConfluent)丰富的source connector资源,从MySQL/PostgreSQL/SQLServer等数据库实时同步数据。

四、开发入门

0)版本要求

组件 版本要求 推荐版本
Java 8+ 8
Zookeeper 3.4+ 3.4.14
Kafka kafka_2.11-2.1.1+ kafka_2.11-2.1.1
Kakfa connect 一般采用kafka内置组件,与kafka保持一致即可 一般采用kafka内置组件,与kafka保持一致即可
ADBPG 6.0 6.0

1)安装Java并配置环境变量

首先需要去官网下载jdk安装包并安装到对应平台,并配置好JAVA_HOME环境变量。安装完毕后可以通过输入java -version检查是否已经配置好java环境。
7.png

2)安装zookeeper

需要去zookeeper官网下载安装包并对应安装配置。
安装完毕后可以执行path_to_zookeeper/bin/zkServer.sh status 检查zookeeper是否正常运行

3)安装kafka集群

需要去官网下载kafka安装包,并对应安装配置。安装完毕后,可以执行命令查看那kafka集群当前topic列表。
path_to_zookeeper/bin/kafka-topics.sh --list --zookeeper localhost:2181
这里提供一个参考安装包,可用于核对版本以及参考配置
https://adbpg-public.oss-cn-beijing.aliyuncs.com/kafka2_1_1.tar.gz

4)安装kafka connect jar包

下载jar包,并放置入path_to_kafka/plugin_jars路径下。

下载地址:https://adbpg-public.oss-cn-beijing.aliyuncs.com/kafka-connect-jdbc-5.3.1-jar-with-dependencies.jar

5)创建ADBPG集群及目标表

5-1)开通ADBPG实例

如果尚未开通ADBPG实例,请参照以下文档开通6.0版本ADBPG实例(4.3版本不支持),并创建初始账号:
https://help.aliyun.com/document_detail/50200.html?spm=a2c4g.11186623.6.557.133412bfHEMkSR
为了达到最佳的写入性能和安全性,建议kafka集群与要写入的ADB PG实例位于同一region、同一VPC下,通过内网通信。

5-2)设置ADBPG实例白名单

ADBPG实例需要设置白名单保证Kafka集群能够访问ADBPG实例:在ADBPG控制台点击进入所创建的ADBPG实例,点击数据安全性-添加白名单分组。
8.png
将对应的网段添加进ADBPG实例白名单,点击确定。

5-3)连接ADBPG实例并创建要写入的目标表

执行写入任务前需要在ADB PG实例中创建要写入的目标表,这里给出一个例子:

create table test15(                  
b1 bigint,
b2 smallint,
b3 smallint,
b4 int,
b5 boolean,
b6 real,                      
b7 double precision,          
b8 double precision,          
b9 date,
b10 time with time zone,
b11 timestamp with time zone,
b12 text,
b15 json
);

6)启动kafka connect service

参考官网文档配置kafka conenct service并启动,这里提供命令参考:
nohup bin/connect-distributed.sh config/connect-distributed.properties >> connect.log
启动过程中,kafka connect service会不断打印出现有plugin的加载过程,以及一些全局参数配置:
9.png
Kafka connect启动时需要提供配置文件,这里提供一份配置的demo,各个参数的详细含义见第六章。

vim path_to_kafka/config/connect-distributed.properties
group.id=connect-cluster
# The name of the Kafka topic where connector offsets are stored
offset.storage.topic=connect-offsets
# Replication factor used when creating the offset storage topic
offset.storage.replication.factor=1
# Interval at which to try committing offsets for tasks.
offset.flush.interval.ms=10000
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. 
bootstrap.servers=localhost:9092
# List of paths separated by commas (,) that contain plugins 
plugin.path=path_to_kafka/plugin_jars
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
config.storage.topic=config-storage
status.storage.topic=config-status
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1

7)启动adbpg sink connector
配置json格式的adbpg sink connector配置文件,并参考官方文档向kafka connect提交adbpg sink任务,这里提供命令参考:
配置文档实例如下,全部参数配置请参考第六章。

{
     "name":"adb4pg-jdbc-sink",
     "config": {
        "name":"adb4pg-jdbc-sink",
        "topics":"server1.dbo.t1",
        "connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",
        "connection.url":"jdbc:postgresql://yourinstance-url:port/yourdbname",
        "connection.user":"yourname",
        "connection.password":"******",
        "col.names":"a,b,c,d,e,f,g",
        "col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",
        "pk.fields":"a",
        "schemas.enable" : "false",
        "target.tablename":"t1",
        "tasks.max":"1",
        "auto.create":"false",
        "table.name.format":"t1",
        "batch.size":"1",
        "value.converter.schemas.enable":"false",
        "value.converter":"org.apache.kafka.connect.json.JsonConverter"
        }
}

启动adbpg sink connector:

#查看当前集群kafka connector列表:
curl 127.0.0.1:8083/connectors

#提交配置文件(这里以adb4pg-sink.json为例),并启动adbpg sink connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-sink.json 

#删除connectors
curl -s -X DELETE http://localhost:8083/connectors/adb4pg-sink

#查看connector状态
curl http://localhost:8083/connectors/adb4pg-jdbc-sink/status

10.png

五、格式约定

本链路支持对目标库进行三种操作:insert、delete和update,对于单条操作对应的kafka消息的格式要求为json,json内各个字段的数据格式要求如下。

操作类别 op字段 before字段 after字段 demo
insert c 无限制 要插入记录列名与取值对应的取值对应的字典 {"before": null,"after": {"id": 1,"name": "44"},"op": "c"}
delete d 要删除记录列名与取值对应的取值对应的字典 无限制 {"after": null,"before": {"id": 1,"name": "44"},"op": "d"}
update u 所要更新的原始数据的列名与取值对应的取值对应的字典 更新后记录列名与取值对应的取值对应的字典 {"before": {"id":2},"after": {"id": 1,"name": "44"},"op": "u"}

对于某个topic内含有如下消息数据:

{"before": null,"after": {"id": 1,"name": "44"},"op": "c","transaction": null}
{"before": {"id":2},"after": {"id": 1,"name": "44"},"op": "u","transaction": null}
{"after": null,"before": {"id": 1,"name": "44"},"op": "d","transaction": null}

对应会产生的同步操作为:本链路对应会向目标库表插入一条id=1,name='44' 的数据;然后按id=2更新数据,将id更新为1,name更新为'44';最后会向目标表删除id=1,name='44'的数据。

六、参数配置

1)zookeeper参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考zookeeper官方文档

参数名 参数含义 推荐值
dataDir 数据目录
dataLogDir 日志目录
clientPort 端口 2181
server.x 各个节点配置 与实际的机器相符

2) kafka参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考kafka官方文档。

参数名 参数含义 推荐值
broker.id 各个broker的唯一id
log.dirs 日志路径
offsets.topic.replication.factor 副本数 3
host.name 当前机器
host 与实际相符
port 端口 9092
zookeeper.connect 所连接的zk地址 与实际的zk地址相符
auto.create.topics.enable 是否允许自动创建topic true
log.retention.hours topic内数据过期时间,单位h 72
log.cleanup.policy topic内过期数据清理策略 delete,即为删除

3)kafka connect参数配置

这里仅列出本链路所必需的参数配置,更多参数配置请参考kafka connect官方文档。

参数名 参数含义 推荐值
group.id 集群id,kafka connect集群上所有节点取值必须统一,标志属于同一集群
offset.storage.topic 记录kafka-connect位点的topic名称
offset.storage.replication.factor kafka connect相关topic副本数 3
offset.flush.interval.ms kafka connect flush间隔,单位ms 10000
bootstrap.servers 连接到的kafka集群地址 与实际集群相符
plugin.path kafka connector jar包路径

4)adbpg sink connector参数配置

adbpg sink connector支持参数配置如下:

参数名 参数含义 默认值 推荐值
name kafka connector命名
topics 源数据所在topic 与源数据所在topic相符
connector.class connector所对应的实现类 必须写io.confluent.connect.jdbc.Adb4PgSinkConnector
connector.url 目标ADBPG库的jdbc地址
connector.user ADBPG用户名
connector.password ADBPG密码
col.names 写入ADBPG目标表列名
col.types 写入ADBPG目标表列类型
pk.fields 主键列表,以逗号分割
tasks.max 启动线程数 1 1
target.tablename 写入ADBPG目标表名
writeMode 写入方式,取值为insert或upsert insert insert

七、可用性和错误恢复

1)高可用

组件 是否支持高可用 故障切换方式 依赖情况
zookeeper leader挂掉会根据paxos协议重新选主,单个follower挂掉不影响可用性。 不依赖其他组件
kafka 写入多副本,单个broker挂掉不影响可用性。 kafka消息队列会向zk写入状态量,zk不可用影响kafka集群可用性;
kafka connect 由kafka集群维护kafka connect的可用性,kafka connect挂掉之后会触发task rebalance,进行task迁移,单个kafka connect挂掉不影响可用性。 kafka connect会向kafka topic写入消费到的位点(offset),kafka集群不可用影响kafka connect可用性;kafka connect不会向zk写入数据。

2)错误恢复

kafka/kafka connect/zookeeper三个服务组件均支持高可用,三副本配置下,挂掉任意一个或两个节点均可自动恢复任务进度。若同组件三个节点同时挂掉,通常为硬件或网络故障,需要按具体情况排查。

3)Task rebalancing

为了“负载均衡”每个worker上面的task数量和机器负载,kafka connect也有一个rebalance的概念,负责在connecotor启动时,有worker挂掉时,将task迁移到更空闲的worker上。但是要注意,如果是task fail,不会触发rebalance,这种时候通常是代码或数据出现了异常需要人工介入处理。
11.png

八、幂等性/exactly once保证

在目标表配置主键,并且配置writeMode='upsert'的场景下,本链路能够保证exactly-once;在其他场景下,本链路保证at-least-once.
kafka connector task发生失败,kafka自身提供的offset机制进行recovery通常只能恢复到最近一次提交的offset,而不是失败时的状态。upsert模式可以避免有主键的表约束冲突或数据重复的情况。

目录
相关文章
|
2月前
|
存储 缓存 Cloud Native
MPP架构数据仓库使用问题之ADB PG云原生版本的扩缩容性能怎么样
MPP架构数据仓库使用问题之ADB PG云原生版本的扩缩容性能怎么样
MPP架构数据仓库使用问题之ADB PG云原生版本的扩缩容性能怎么样
|
2月前
|
存储 数据管理 BI
揭秘数据仓库的奥秘:数据究竟如何层层蜕变,成为企业决策的智慧源泉?
【8月更文挑战第26天】数据仓库是企业管理数据的关键部分,其架构直接影响数据效能。通过分层管理海量数据,提高处理灵活性及数据一致性和安全性。主要包括:数据源层(原始数据)、ETL层(数据清洗与转换)、数据仓库层(核心存储与管理)及数据服务层(提供分析服务)。各层协同工作,支持高效数据管理。未来,随着技术和业务需求的变化,数仓架构将持续优化。
43 3
|
19天前
|
存储 机器学习/深度学习 数据管理
数据技术的进化史:从数据仓库到数据中台再到数据飞轮
数据技术的进化史:从数据仓库到数据中台再到数据飞轮
|
5天前
|
机器学习/深度学习 消息中间件 搜索推荐
【数据飞轮】驱动业务增长的高效引擎 —从数据仓库到数据中台的技术进化与实战
在数据驱动时代,企业逐渐从数据仓库过渡到数据中台,并进一步发展为数据飞轮。本文详细介绍了这一演进路径,涵盖数据仓库的基础存储与查询、数据中台的集成与实时决策,以及数据飞轮的自动化增长机制。通过代码示例展示如何在实际业务中运用数据技术,实现数据的最大价值,推动业务持续优化与增长。
18 4
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
86 9
|
2月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
69 0
|
2月前
|
Java Spring 监控
Spring Boot Actuator:守护你的应用心跳,让监控变得触手可及!
【8月更文挑战第31天】Spring Boot Actuator 是 Spring Boot 框架的核心模块之一,提供了生产就绪的特性,用于监控和管理 Spring Boot 应用程序。通过 Actuator,开发者可以轻松访问应用内部状态、执行健康检查、收集度量指标等。启用 Actuator 需在 `pom.xml` 中添加 `spring-boot-starter-actuator` 依赖,并通过配置文件调整端点暴露和安全性。Actuator 还支持与外部监控工具(如 Prometheus)集成,实现全面的应用性能监控。正确配置 Actuator 可显著提升应用的稳定性和安全性。
55 0
|
2月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 缓存 Kafka
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?
【Azure 事件中心】使用Kafka消费Azure EventHub中数据,遇见消费慢的情况可以如何来调节呢?

热门文章

最新文章

下一篇
无影云桌面