flink postgresql cdc实时同步(含pg安装配置等)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: flink postgresql cdc实时同步(含pg安装配置等)

1. 环境信息

类型 版本/描述
docker 20.10.9
Postgresql 10.6
初始化账号密码:postgres/postgres
普通用户:test1/test123
数据库:test_db
flink 1.13.6

2. 安装

step1: 拉取 PostgreSQL 10.6 版本的镜像:

docker pull postgres:10.6

step2:创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres,并将 pgoutput 插件加载到 PostgreSQL 实例中:

docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'

step3: 查看容器是否创建成功:

docker ps | grep postgres-10.6

3. 配置

step1:docker进去Postgresql数据的容器:

docker exec -it postgres-10.6  bash

step2:编辑postgresql.conf配置文件:

vi /var/lib/postgresql/data/postgresql.conf

配置内容如下:

# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s

step3:重启容器:

docker restart postgres-10.6

连接数据库,如果查询一下语句,返回logical表示修改成功:

SHOW wal_level;

4. 新建用户并赋权

使用创建容器时的账号密码(postgres/postgres)登录Postgresql数据库。

先创建数据库和表:

-- 创建数据库 test_db
CREATE DATABASE test_db;
-- 连接到新创建的数据库 test_db
\c test_db
-- 创建 t_user 表
CREATE TABLE "public"."t_user" (
    "id" int8 NOT NULL,
    "name" varchar(255),
    "age" int2,
    PRIMARY KEY ("id")
);

新建用户并且给用户权限:

-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';
-- 给用户复制流权限
ALTER ROLE test1 replication;
-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;
-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

5. 发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;

更改表的复制标识包含更新和删除的值:

-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';

6. flink sql

-- 源表定义
CREATE TABLE `table_source_pg` (
      id BIGINT,
      name STRING,
      age INT
      ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = '10.194.183.120',
      'port' = '30028',
      'username' = 'test1',
      'password' = 'test123',
      'database-name' = 'test_db',
      'schema-name' = 'public',
      'table-name' = 't_user',
      'decoding.plugin.name' = 'pgoutput'
)
-- 目标表表定义
CREATE TABLE `table_sink_mysql` (
      id BIGINT,
      name STRING,
      age INT,
      PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://10.194.183.120:30306/test',
      'username' = 'root',
      'password' = 'root',
      'table-name' = 't_user_copy'
)
-- insert语句
INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`)

7. 命令汇总

-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';
-- 给用户复制流权限
ALTER ROLE ODPS_ETL replication;
-- 给用户数据库权限
GRANT CONNECT ON DATABASE test_db to test1;
-- 设置发布开关
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
-- 给表查询权限
grant select on TABLE aa to ODPS_ETL;
-- 给用户读写权限
grant select,insert,update,delete ON  ALL TABLES IN SCHEMA public to bd_test;
-- 把当前库所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ODPS_ETL;
-- 把当前库以后新建的表查询权限赋给用户
alter default privileges in schema public grant select on tables to ODPS_ETL;
-- 更改复制标识包含更新和删除之前值
ALTER TABLE test0425 REPLICA IDENTITY FULL;
-- 查看复制标识
select relreplident from pg_class where relname='test0425';
-- 查看solt使用情况
SELECT * FROM pg_replication_slots;
-- 删除solt
SELECT pg_drop_replication_slot('zd_org_goods_solt');
-- 查询用户当前连接数
select usename, count(*) from pg_stat_activity group by usename order by count(*) desc;
-- 设置用户最大连接数
alter role odps_etl connection limit 200;

附:

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
23天前
|
资源调度 调度 流计算
Flink 细粒度资源管理问题之为不同的SSG配置资源如何解决
Flink 细粒度资源管理问题之为不同的SSG配置资源如何解决
|
23天前
|
存储 NoSQL 分布式数据库
Flink 细粒度资源管理问题之调整 slot 配置来提高资源利用效率如何解决
Flink 细粒度资源管理问题之调整 slot 配置来提高资源利用效率如何解决
|
25天前
|
关系型数据库 MySQL Linux
在Linux中,如何配置数据库服务器(如MySQL或PostgreSQL)?
在Linux中,如何配置数据库服务器(如MySQL或PostgreSQL)?
|
28天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
60 2
|
18天前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
关系型数据库 分布式数据库 数据库
PolarDB产品使用问题之如何进行PostgreSQL(简称PG)的全量和增量备份管理
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
2月前
|
SQL 监控 关系型数据库
实时计算 Flink版操作报错合集之在设置监控PostgreSQL数据库时,将wal_level设置为logical,出现一些表更新和删除操作报错,怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之在处理PostgreSQL数据库遇到报错。该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
存储 SQL 关系型数据库
实时计算 Flink版产品使用问题之要配置MySQL集群存储节点,该如何配置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。