flink cdc 整理

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: flink cdc 整理

flink posgresql cdc

前置工作

1,更改配置文件postgresql.conf

# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  

wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值

更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改

2,新建用户并且给用户复制流权限

-- pg新建用户
CREATE USER user WITH PASSWORD 'pwd';
-- 给用户复制流权限
ALTER ROLE user replication;
-- 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;
-- 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

3,发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;

DataStream Api

1: maveny依赖引入

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
       <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>

2.postgresqlCDC2Kafka.java代码

import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class postgresqlCDC2Kafka {
    public static void main(String[] args) throws Exception {
        String fileName = args[0];
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fileName);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2000L));
        //设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://ip:8020/../.."));
        //设置访问 HDFS 的用户名
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        Properties properties = new Properties();
        properties.setProperty("snapshot.mode", "initial");
        properties.setProperty("debezium.slot.name", "pg_cdc");
        properties.setProperty("debezium.slot.drop.on.stop", "true");
        properties.setProperty("include.schema.changes", "true");
        SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
                .hostname("192.168.1.xxx")
                .port(5432)
                .database("databseName") // monitor postgres database
                .schemaList("schemaName")  // monitor inventory snachema
                .tableList("schemaName.table1,scheamName.tabl2,...") // monitor products table
                .username("userName")
                .password("password")
                .decodingPluginName("pgoutput")                
                .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
                .debeziumProperties(properties)
                .build();
        DataStreamSource<String> pgDataStream =
                env
                .addSource(sourceFunction)
                .setParallelism(1); // use parallelism 1 for sink to keep message ordering
        // 设置kafka配置
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers","ip1:9092");
        kafkaProps.setProperty("transaction.max.timeout.ms",90000);
//         sink到kafka
        FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer<>("topicName"), new SimpleStringSchema(), kafkaProps);
        pgDataStream.addSink(flinkKafkaProducer).name("sink2Kafka");
        env.execute("pg_cdc job");
    }
}

    注意:postgresql 11以上,decodingPluginName为pgoutput

    flink cdc错误整理

    1:mysql-cdc指定剔除不需要监听的字段信息时抛出异常:

    即指定"'debezium.column.blacklist'"配置信息时抛出异常

    org.apache.kafka.connect.errors.DataException: order_sales is not a valid field name
      at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
      at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
      at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambda$createRowConverter$508c5858$1(RowDataDebeziumDeserializeSchema.java:364)
      at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambda$wrapIntoNullableConverter$7b91dc26$1(RowDataDebeziumDeserializeSchema.java:390)
      at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:126)
      at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:101)
      at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:97)
      at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
      at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
      at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      分析:指定debezium.column.blacklist该参数的意思是指在debezium监听到事件后会把记录中的指定字段删除,然后在flink做解析转换的时候找不到字段。

      2:cdc source扫描mysql表期间,进行加锁操作。

      解决方案:
             给使用的mysql用户授予reload权限即可。详细见:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server
             使用'debezium.snapshot.locking.mode'='none'

      3:同步锁表

        User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.

          Flink CDC基于Debezium实现。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。

          解决办法:创建一个新的MySQL用户并授予其必要的权限。

          mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
          mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
          mysql> FLUSH PRIVILEGES;

          4:Flink作业扫描MySQL全量数据出现fail-over

          Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图:

          image.png

          原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响),而在 scan 全表过程中是没有 offset 可以记录的(意味着没法做 checkpoint),但是 Flink 框架任何时候都会按照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比较取巧的方式,即在 scan 全表的过程中,会让执行中的 checkpoint 一直等待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成上面的现象。

          解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

          execution.checkpointing.interval: 10min   # checkpoint间隔时间
          execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失败容忍次数
          restart-strategy: fixed-delay  # 重试策略
          restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数

          5:作业在运行时 mysql cdc source 报 no viable alternative at input 'alter table std'

          60a6bcefe26f4b118e50f46e4d0afd1d.png

          原因:因为数据库中别的表做了字段修改,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。

          解决方法:在 flink-cdc-connectors 最新版本中已经修复该问题(跳过了无法解析的 DDL)。升级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。

          6:多个作业共用同一张 source table 时,没有修改 server id 导致读取出来的数据有丢失。

          原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。

          解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:

          FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;

          7: flinksql cdc时区差8小时的问题

          在连接参数中设置 ‘server-time-zone’ = ‘Asia/Shanghai’

          比如:

          WITH (
          ‘connector’ = ‘mysql-cdc’,
          ‘hostname’ = ‘xxx’,
          ‘port’ = ‘3306’,
          ‘username’ = ‘root’,
          ‘password’ = ‘root’,
          ‘database-name’ = ‘xxx’,
          ‘table-name’ = ‘xxx’,
          ‘server-time-zone’ = ‘Asia/Shanghai’

            不设置的话可能会改变MySQL中时间字段比如datetime减8小时


            在sql语句中使用LOCALTIMESTAMP或者手动给时间戳加8小时而不要用current_date等

            效果如图:

            60a6bcefe26f4b118e50f46e4d0afd1d.png

            补充:

            如果要sink到MySQL的话,在url后加&serverTimezone=Asia/Shanghai 否则时区也会对不上或者在url上添加

            jdbc:mysql://${hostname}/${db_name}useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true

            8:flink cdc Encountered chage event for table xxx.xxxx whose schema isn't known to this connector

            解决方案:

            inconsistent.schema.handing.mode=''warn'

            60a6bcefe26f4b118e50f46e4d0afd1d.png

            9: Flinksql From Mysql-cdc Sink to Hbase Cause Miss Data

            定位:

               1:改源码,增加log

               2:查看写入逻辑

            #open逻辑,有个定时任务刷新
            if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
                    this.executor = Executors.newScheduledThreadPool(
                      1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                    this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                      if (closed) {
                        return;
                      }
                      try {
                        flush();
                      } catch (Exception e) {
                        // fail the sink and skip the rest of the items
                        // if the failure handler decides to throw an exception
                        failureThrowable.compareAndSet(null, e);
                      }
                    }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
                  }
            # invoke逻辑
            if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
                  flush();
            }
            # snapshot逻辑,当队列中还有数据请求未刷新时才满足
            while (numPendingRequests.get() != 0) {
                  flush();
            }

            以RowKey=0为例发现操作已经被封住在Mutation中,且已经被刷新了。但在hbase中并未找到该key.猜测可能在Mutator处理乱序数据了。

            60a6bcefe26f4b118e50f46e4d0afd1d.png

            搜索查证资料:

            https://www.jianshu.com/p/1a753ffcbe2ahttps://issues.apache.org/jira/browse/HBASE-8626?focusedCommentId=13669455&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13669455

            解决方案:

            :短期方案:设置'sink.buffer-flush.max-rows'='2'暂时规避该问题,但对rs会有较大压力
            2:彻底解决:基于issue改造源码

            10:相关参数说明:

            snapshot.mode的各种参数,以下是测试效果
            properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector
            properties.setProperty("snapshot.mode", "initial");每次重启都会读全量
            properties.setProperty("snapshot.mode", "initial_only");//读不到数据
            properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似
            properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到
            properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot
              60a6bcefe26f4b118e50f46e4d0afd1d.png
            相关实践学习
            基于Hologres+Flink搭建GitHub实时数据大屏
            通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
            实时计算 Flink 实战课程
            如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
            相关文章
            |
            8月前
            |
            SQL 关系型数据库 Apache
            从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
            本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
            3000 0
            从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
            |
            消息中间件 关系型数据库 MySQL
            基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
            基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
            1268 0
            |
            11月前
            |
            数据采集 SQL canal
            Amoro + Flink CDC 数据融合入湖新体验
            本文总结了货拉拉高级大数据开发工程师陈政羽在Flink Forward Asia 2024上的分享,聚焦Flink CDC在货拉拉的应用与优化。内容涵盖CDC应用现状、数据入湖新体验、入湖优化及未来规划。文中详细分析了CDC在多业务场景中的实践,包括数据采集平台化、稳定性建设,以及面临的文件碎片化、Schema演进等挑战。同时介绍了基于Apache Amoro的湖仓融合架构,通过自优化服务解决小文件问题,提升数据新鲜度与读写平衡。未来将深化Paimon与Amoro的结合,打造更高效的入湖生态与自动化优化方案。
            594 1
            Amoro + Flink CDC 数据融合入湖新体验
            |
            11月前
            |
            SQL 关系型数据库 MySQL
            Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
            Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
            1707 1
            Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
            |
            12月前
            |
            SQL API Apache
            Dinky 和 Flink CDC 在实时整库同步的探索之路
            本次分享围绕 Dinky 的整库同步技术演进,从传统数据集成方案的痛点出发,探讨了 Flink CDC Yaml 作业的探索历程。内容分为三个部分:起源、探索、未来。在起源部分,分析了传统数据集成方案中全量与增量割裂、时效性低等问题,引出 Flink CDC 的优势;探索部分详细对比了 Dinky CDC Source 和 Flink CDC Pipeline 的架构与能力,深入讲解了 YAML 作业的细节,如模式演变、数据转换等;未来部分则展望了 Dinky 对 Flink CDC 的支持与优化方向,包括 Pipeline 转换功能、Transform 扩展及实时湖仓治理等。
            1344 12
            Dinky 和 Flink CDC 在实时整库同步的探索之路
            |
            Oracle 关系型数据库 Java
            【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
            本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
            【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
            |
            关系型数据库 MySQL 数据库
            基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
            TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
            2664 5
            基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
            |
            10月前
            |
            消息中间件 SQL 关系型数据库
            Flink CDC + Kafka 加速业务实时化
            Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
            |
            Java 关系型数据库 MySQL
            SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
            通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
            3180 45
            |
            存储 SQL Java
            Flink CDC + Hologres高性能数据同步优化实践
            本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
            870 1
            Flink CDC + Hologres高性能数据同步优化实践

            热门文章

            最新文章