flink cdc 整理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: flink cdc 整理

flink posgresql cdc

前置工作

1,更改配置文件postgresql.conf

# 更改wal日志方式为logical
wal_level = logical # minimal, replica, or logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20 # max number of replication slots
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20 # max number of walsender processes
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
wal_sender_timeout = 180s # in milliseconds; 0 disable  

wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值

更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改

2,新建用户并且给用户复制流权限

-- pg新建用户
CREATE USER user WITH PASSWORD 'pwd';
-- 给用户复制流权限
ALTER ROLE user replication;
-- 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;
-- 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

3,发布表

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;

DataStream Api

1: maveny依赖引入

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
       <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>

2.postgresqlCDC2Kafka.java代码

import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
public class postgresqlCDC2Kafka {
    public static void main(String[] args) throws Exception {
        String fileName = args[0];
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fileName);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(5000L);
        //指定 CK 的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 指定从 CK 自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 2000L));
        //设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://ip:8020/../.."));
        //设置访问 HDFS 的用户名
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        Properties properties = new Properties();
        properties.setProperty("snapshot.mode", "initial");
        properties.setProperty("debezium.slot.name", "pg_cdc");
        properties.setProperty("debezium.slot.drop.on.stop", "true");
        properties.setProperty("include.schema.changes", "true");
        SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
                .hostname("192.168.1.xxx")
                .port(5432)
                .database("databseName") // monitor postgres database
                .schemaList("schemaName")  // monitor inventory snachema
                .tableList("schemaName.table1,scheamName.tabl2,...") // monitor products table
                .username("userName")
                .password("password")
                .decodingPluginName("pgoutput")                
                .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
                .debeziumProperties(properties)
                .build();
        DataStreamSource<String> pgDataStream =
                env
                .addSource(sourceFunction)
                .setParallelism(1); // use parallelism 1 for sink to keep message ordering
        // 设置kafka配置
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers","ip1:9092");
        kafkaProps.setProperty("transaction.max.timeout.ms",90000);
//         sink到kafka
        FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer<>("topicName"), new SimpleStringSchema(), kafkaProps);
        pgDataStream.addSink(flinkKafkaProducer).name("sink2Kafka");
        env.execute("pg_cdc job");
    }
}

    注意:postgresql 11以上,decodingPluginName为pgoutput

    flink cdc错误整理

    1:mysql-cdc指定剔除不需要监听的字段信息时抛出异常:

    即指定"'debezium.column.blacklist'"配置信息时抛出异常

    org.apache.kafka.connect.errors.DataException: order_sales is not a valid field name
      at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
      at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
      at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambda$createRowConverter$508c5858$1(RowDataDebeziumDeserializeSchema.java:364)
      at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.lambda$wrapIntoNullableConverter$7b91dc26$1(RowDataDebeziumDeserializeSchema.java:390)
      at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:126)
      at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:101)
      at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:97)
      at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
      at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
      at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      分析:指定debezium.column.blacklist该参数的意思是指在debezium监听到事件后会把记录中的指定字段删除,然后在flink做解析转换的时候找不到字段。

      2:cdc source扫描mysql表期间,进行加锁操作。

      解决方案:
             给使用的mysql用户授予reload权限即可。详细见:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server
             使用'debezium.snapshot.locking.mode'='none'

      3:同步锁表

        User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.

          Flink CDC基于Debezium实现。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。

          解决办法:创建一个新的MySQL用户并授予其必要的权限。

          mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
          mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
          mysql> FLUSH PRIVILEGES;

          4:Flink作业扫描MySQL全量数据出现fail-over

          Flink 作业在扫描 MySQL 全量数据时,checkpoint 超时,出现作业 failover,如下图:

          image.png

          原因:Flink CDC 在 scan 全表数据(我们的实收表有千万级数据)需要小时级的时间(受下游聚合反压影响),而在 scan 全表过程中是没有 offset 可以记录的(意味着没法做 checkpoint),但是 Flink 框架任何时候都会按照固定间隔时间做 checkpoint,所以此处 mysql-cdc source 做了比较取巧的方式,即在 scan 全表的过程中,会让执行中的 checkpoint 一直等待甚至超时。超时的 checkpoint 会被仍未认为是 failed checkpoint,默认配置下,这会触发 Flink 的 failover 机制,而默认的 failover 机制是不重启。所以会造成上面的现象。

          解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:

          execution.checkpointing.interval: 10min   # checkpoint间隔时间
          execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失败容忍次数
          restart-strategy: fixed-delay  # 重试策略
          restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数

          5:作业在运行时 mysql cdc source 报 no viable alternative at input 'alter table std'

          60a6bcefe26f4b118e50f46e4d0afd1d.png

          原因:因为数据库中别的表做了字段修改,CDC source 同步到了 ALTER DDL 语句,但是解析失败抛出的异常。

          解决方法:在 flink-cdc-connectors 最新版本中已经修复该问题(跳过了无法解析的 DDL)。升级 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替换 flink/lib 下的旧包。

          6:多个作业共用同一张 source table 时,没有修改 server id 导致读取出来的数据有丢失。

          原因:MySQL binlog 数据同步的原理是,CDC source 会伪装成 MySQL 集群的一个 slave(使用指定的 server id 作为唯一 id),然后从 MySQL 拉取 binlog 数据。如果一个 MySQL 集群中有多个 slave 有同样的 id,就会导致拉取数据错乱的问题。

          解决方法:默认会随机生成一个 server id,容易有碰撞的风险。所以建议使用动态参数(table hint)在 query 中覆盖 server id。如下所示:

          FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;

          7: flinksql cdc时区差8小时的问题

          在连接参数中设置 ‘server-time-zone’ = ‘Asia/Shanghai’

          比如:

          WITH (
          ‘connector’ = ‘mysql-cdc’,
          ‘hostname’ = ‘xxx’,
          ‘port’ = ‘3306’,
          ‘username’ = ‘root’,
          ‘password’ = ‘root’,
          ‘database-name’ = ‘xxx’,
          ‘table-name’ = ‘xxx’,
          ‘server-time-zone’ = ‘Asia/Shanghai’

            不设置的话可能会改变MySQL中时间字段比如datetime减8小时


            在sql语句中使用LOCALTIMESTAMP或者手动给时间戳加8小时而不要用current_date等

            效果如图:

            60a6bcefe26f4b118e50f46e4d0afd1d.png

            补充:

            如果要sink到MySQL的话,在url后加&serverTimezone=Asia/Shanghai 否则时区也会对不上或者在url上添加

            jdbc:mysql://${hostname}/${db_name}useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true

            8:flink cdc Encountered chage event for table xxx.xxxx whose schema isn't known to this connector

            解决方案:

            inconsistent.schema.handing.mode=''warn'

            60a6bcefe26f4b118e50f46e4d0afd1d.png

            9: Flinksql From Mysql-cdc Sink to Hbase Cause Miss Data

            定位:

               1:改源码,增加log

               2:查看写入逻辑

            #open逻辑,有个定时任务刷新
            if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
                    this.executor = Executors.newScheduledThreadPool(
                      1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                    this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                      if (closed) {
                        return;
                      }
                      try {
                        flush();
                      } catch (Exception e) {
                        // fail the sink and skip the rest of the items
                        // if the failure handler decides to throw an exception
                        failureThrowable.compareAndSet(null, e);
                      }
                    }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
                  }
            # invoke逻辑
            if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
                  flush();
            }
            # snapshot逻辑,当队列中还有数据请求未刷新时才满足
            while (numPendingRequests.get() != 0) {
                  flush();
            }

            以RowKey=0为例发现操作已经被封住在Mutation中,且已经被刷新了。但在hbase中并未找到该key.猜测可能在Mutator处理乱序数据了。

            60a6bcefe26f4b118e50f46e4d0afd1d.png

            搜索查证资料:

            https://www.jianshu.com/p/1a753ffcbe2ahttps://issues.apache.org/jira/browse/HBASE-8626?focusedCommentId=13669455&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13669455

            解决方案:

            :短期方案:设置'sink.buffer-flush.max-rows'='2'暂时规避该问题,但对rs会有较大压力
            2:彻底解决:基于issue改造源码

            10:相关参数说明:

            snapshot.mode的各种参数,以下是测试效果
            properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector
            properties.setProperty("snapshot.mode", "initial");每次重启都会读全量
            properties.setProperty("snapshot.mode", "initial_only");//读不到数据
            properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似
            properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到
            properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot
              60a6bcefe26f4b118e50f46e4d0afd1d.png
            相关实践学习
            基于Hologres轻松玩转一站式实时仓库
            本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
            Linux入门到精通
            本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
            相关文章
            |
            13天前
            |
            消息中间件 资源调度 关系型数据库
            如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
            本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
            40 9
            |
            2月前
            |
            算法 API Apache
            Flink CDC:新一代实时数据集成框架
            本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
            594 1
            Flink CDC:新一代实时数据集成框架
            |
            2月前
            |
            消息中间件 canal 数据采集
            Flink CDC 在货拉拉的落地与实践
            陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
            542 14
            Flink CDC 在货拉拉的落地与实践
            |
            3月前
            |
            Oracle 关系型数据库 新能源
            Flink CDC 在新能源制造业的实践
            本文撰写自某新能源企业的研发工程师 单葛尧 老师。本文详细介绍该新能源企业的大数据平台中 CDC 技术架构选型和 Flink CDC 的最佳实践。
            447 13
            Flink CDC 在新能源制造业的实践
            |
            3月前
            |
            SQL 数据库 流计算
            Flink CDC数据读取问题之一致性如何解决
            Flink CDC 使用Change Data Capture (CDC)技术从数据库捕获变更事件,并利用Flink的流处理能力确保数据读取一致性。相较于传统工具,它具备全增量一体化数据集成能力,满足实时性需求。在实践中解决了高效数据同步、稳定同步大量表数据等问题。应用场景包括实时数据同步、实时数据集成等。快速上手需学习基本概念与实践操作。未来发展方向包括提升效率与稳定性,并依据用户需求持续优化。
            126 1
            |
            4月前
            |
            关系型数据库 API Apache
            Flink CDC:基于 Apache Flink 的流式数据集成框架
            本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
            18213 11
            Flink CDC:基于 Apache Flink 的流式数据集成框架
            |
            4月前
            |
            SQL JSON 缓存
            玳数科技集成 Flink CDC 3.0 的实践
            本文投稿自玳数科技工程师杨槐老师,介绍了 Flink CDC 3.0 与 ChunJun 框架在玳数科技的集成实践。
            593 7
            玳数科技集成 Flink CDC 3.0 的实践
            |
            3月前
            |
            消息中间件 关系型数据库 MySQL
            实时计算 Flink版操作报错合集之遇到iava.lang.NoClassDefFoundError: ververica/cdc/common/utils/StrinaUtils错误,是什么导致的
            在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
            |
            3月前
            |
            资源调度 关系型数据库 MySQL
            【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
            【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
            294 2
            |
            3月前
            |
            SQL 关系型数据库 MySQL
            “震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
            【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
            338 1

            热门文章

            最新文章