flink cdc多种数据源安装、配置与验证(超详细总结)(上)

本文涉及的产品
云数据库 RDS SQL Server,独享型 2核4GB
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
实时计算 Flink 版,5000CU*H 3个月
简介: flink cdc多种数据源安装、配置与验证(超详细总结)

本文目录结构

|___ 1. 前言

|___ 2. 数据源安装与配置

|______ 2.1 MySQL

|_________ 2.1.1 安装

|_________ 2.1.2 CDC 配置

|______ 2.2 Postgresql

|_________ 2.2.1 安装

|_________ 2.2.2 CDC 配置

|______ 2.3 Oracle

|_________2.3.1 安装

|_________2.3.2 CDC 配置

|_______2.4 SQLServer

|_________2.4.1 安装

|_________2.4.2 CDC 配置

|___ 3. 验证

|_______3.1 Flink版本与CDC版本的对应关系

|_______3.2 下载相关包

|_______3.3 添加cdc jar 至lib目录

|_______3.4 验证


1. 前言

关于如何使用和配置flink cdc功能,其实在官方文档(https://ververica.github.io/flink-cdc-connectors/master/)有相关的教程了,如下:

但是讲解的不是很详细,比如数据源怎么安装?怎么配置?都没有很详细的描述每一步骤,因此博主前面发布多篇文章以此来记录flink cdc相关数据源以及其配置相关的文章,有兴趣的同学可以参考下:

本文主要就是记录在docker下安装和配置各种数据源,以实现flink cdc的功能,包含如下常见的数据源:

数据源 版本
MySQL 8.0.25
Postgresql 10.6
Oracle 11g
SqlServer 2019

2. 数据源安装与配置

2.1 MySQL

版本:8.0.25

2.1.1 安装

Step1: 拉取mysql镜像:

docker pull mysql:8.0.25

Step2: 创建并运行 MySQL 容器

docker run -d -p 30025:3306 --name mysql8.0.25 -e MYSQL_ROOT_PASSWORD=root mysql:8.0.25

2.1.2 CDC 配置

Step1:进入正在运行的mysql容器:

docker exec -it mysql8.0.25 mysql -uroot -proot

Step2:配置 CDC

-- 启用二进制日志
mysql> SET GLOBAL log_bin = ON;
-- 设置二进制日志格式为行级别
mysql> SET GLOBAL binlog_format = 'ROW';

Step3(非必要):如果配置没生效,重启容器

docker restart mysql8.0.25

2.2 Postgresql

版本:PostgreSQL 10.6 (Debian 10.6-1.pgdg90+1)

2.2.1 安装

Step1: 拉取 PostgreSQL 10.6 版本的镜像:

docker pull postgres:10.6

Step2:创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres,并将 pgoutput 插件加载到 PostgreSQL 实例中:

docker run -d -p 30028:5432 --name postgres-10.6 -e POSTGRES_PASSWORD=postgres postgres:10.6 -c 'shared_preload_libraries=pgoutput'

Step3: 查看容器是否创建成功:

docker ps | grep postgres-10.6

2.2.2 CDC 配置

Step1:docker进去Postgresql数据的容器:

docker exec -it postgres-10.6  bash

Step2:编辑postgresql.conf配置文件:

vi /var/lib/postgresql/data/postgresql.conf

配置内容如下:

# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s

Step3:重启容器:

docker restart postgres-10.6

连接数据库,如果查询一下语句,返回logical表示修改成功:

SHOW wal_level;

Step4:新建用户并赋权。使用创建容器时的账号密码(postgres/postgres)登录Postgresql数据库。

-- 创建数据库 test_db
CREATE DATABASE test_db;
-- 连接到新创建的数据库 test_db
\c test_db
-- 创建 t_user 表
CREATE TABLE "public"."t_user" (
    "id" int8 NOT NULL,
    "name" varchar(255),
    "age" int2,
    PRIMARY KEY ("id")
);
-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';
-- 给用户复制流权限
ALTER ROLE test1 replication;
-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;
-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

Step4:发布表:

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';

2.3 Oracle

版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

2.3.1 安装

Step1:拉取 oracle 11g 镜像(有6g,要等较长的时间)

docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

Step2:执行以下命令以创建并运行 Oracle 11g 容器

docker run -d -p 30026:1521 -p 8081:8080 \
--name oracle_11g \
-e ORACLE_HOME=/home/oracle/app/oracle/product/11.2.0/dbhome_2 \
-e ORACLE_SID=helowin \
registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

Step3:查看容器是否启动

docker ps -a|grep oracle_11g

Step4:进入容器

docker exec -it oracle_11g bash

**Step5:**设置账号密码

# 1. 切换至root用户(默认是oracle用户),密码为helowin
su root
# 2. 创建软链接
ln -s $ORACLE_HOME/bin/sqlplus /usr/bin
# 3.切换回oracle用户
su oracle
# 4. 登录sql plus
sqlplus /nolog
conn /as sysdba
## 4.1 修改system用户密码为system
alter user system identified by system;
## 4.2 修改sys用户密码为system
alter user sys identified by system;
## 4.3 新增一个测试用户(用户名:test,密码:test123);
create user test identified by test123;
## 4.4 将dba权限给内部管理员账号和密码
grant connect,resource,dba to test;
## 4.5 修改密码策略规则为:密码永不过期
ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED;
## 4.6 修改数据库最大连接数;
alter system set processes=1000 scope=spfile;
## 4.7 最后重启数据库;
shutdown immediate;
startup;
# 5.退出
exit

2.3.2 CDC 配置

Step1:进入容器

docker exec -it oracle_11g bash

Step2:以DBA的权限登录数据库

sqlplus /nolog
CONNECT sys/system AS SYSDBA

Step3:启用日志归档

-- 设置数据库恢复文件目标大小为10G
alter system set db_recovery_file_dest_size = 10G;
-- 设置数据库恢复文件目标路径
alter system set db_recovery_file_dest = '/home/oracle/app/oracle/product/11.2.0' scope=spfile;
-- 立即关闭数据库
shutdown immediate;
-- 以mount模式启动数据库
startup mount;
-- 启用数据库归档日志模式
alter database archivelog;
-- 打开数据库,允许用户访问
alter database open;

Step4:查看日志归档是否启用(如果显示“Archive Mode”表示已经启用)

archive log list;
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
21天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之CDC任务在异常后整个record sent从0初始化开始,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
467 0
|
21天前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
348 0
|
21天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之下载了mysql的cdc的demo,在本地调试时,报错:找不到这个包,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
118 0
|
21天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
782 0
|
21天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之按时间恢复时,报错:在尝试读取binlog时发现所需的binlog位置不再可用,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
682 0
|
21天前
|
消息中间件 资源调度 Java
实时计算 Flink版操作报错合集之遇到了缺少包的错误,已经添加了相应的 jar 包,仍然出现同样的报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
660 2
|
21天前
|
SQL JSON 数据库
实时计算 Flink版操作报错合集之写入Hudi时,遇到从 COW(Copy-On-Write)表类型转换为 MOR(Merge-On-Read)表类型时报字段错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
21天前
|
监控 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在配置连接时,添加了scan.startup.mode参数后,出现报错。是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
752 0
|
21天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之连接RabbitMQ时遇到Could not find any factory for identifier 'rabbitmq' that implements 'org.apache.flink.table.factories.DynamicTableFactory'错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
384 0
|
21天前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
306 0