问题一:Flink CDC项目组内最近遇到一个flink cdc的技术难点,谁有相关经验的,有空帮忙看看?
Flink CDC项目组内最近遇到一个flink cdc的技术难点,谁有相关经验的,有空帮忙看看?
(1技术实现:oracle:11g/19c,java jdk:17,flink:1.14.2,flink-connector-oracle-cdc(flink-connector-debezium):2.4.1
(2)问题现象:数据同步服务自启动开始,业务上若长时间(如夜间休息时间)没有触发表数据的变更操作(如insert、update、delete),会导致flink cdc捕获到表数据的延迟较大
(3)详细描述:对oracle表的数据进行变更操作(如insert)后,flink cdc捕获到表的insert数据时,需要等待近40分钟:
(3.1)服务启动开始时间:2023-11-29 20:23:35
(3.2)服务运行期间,若短时间内业务有触发表数据的变更操作,flink cdc是可以捕获到表数据的,但之后长时间业务上没有任何的表数据变更操作,一直到时间线1
(3.3)服务运行期间,时间线1:2023-11-30 09:17:47:手动通过insert语句,直接向EBR_CLOUD_GL_HWG.GL001_EBR_PI_TEXT业务表插入了一条记录
(3.4)服务运行期间,时间线2:2023-11-30 09:58:10:flink cdc才捕获到这条insert记录,延迟了近40分钟
(4)正常现象:在不重启此服务的情况下(或者重启服务后),使用相同的数据进行同样的insert操作后,大概在2分钟内,也都可以捕获到此insert记录
(5)服务运行期间,日志无任何报错信息。
参考回答:
根据您提供的信息,这个问题可能是由于Flink CDC在长时间没有数据变更时,会将时间线推进到下一个时间点。当有新的数据插入时,Flink CDC会重新计算时间线,导致捕获延迟增加。
为了解决这个问题,您可以尝试以下方法:
- 调整Flink CDC的时间间隔。您可以在Flink CDC的配置中设置一个较短的时间间隔,以便在没有数据变更时,Flink CDC可以更快地更新时间线。例如,您可以将
maximum-interval
设置为5分钟或更短的时间。 - 优化业务逻辑。确保您的业务逻辑可以在不触发数据变更的情况下正常运行。如果可能的话,可以考虑使用缓存或其他技术来减少对数据库的访问。
- 考虑使用其他CDC工具。如果您发现Flink CDC的性能问题无法通过上述方法解决,您可以考虑使用其他CDC工具,如Debezium或Kafka Connect。这些工具可能在性能方面表现更好,或者提供了更多的配置选项来满足您的需求。
关于本问题的更多回答可点击原文查看:
https://developer.aliyun.com/ask/575857
问题二:Flink CDC是一个作业扮演一个MySQL Slave么?
Flink CDC是一个作业扮演一个MySQL Slave么?
参考回答:
Flink CDC并不是一个作业扮演MySQL Slave的角色。Flink CDC是Flink社区开发的Flink-cdc-connector组件,主要用于从MySQL、PostgreSQL等数据库中直接读取全量数据和增量变更数据。其实现机制通常分为主动查询和事件接收两种模式。在主动查询模式下,通常会在数据源表的某个字段中保存上次更新的时间戳或版本号等信息,然后通过不断的查询与上次的记录进行对比,确定数据是否有变动,从而决定是否需要同步。
对于MySQL来说,Flink提供了专门的CDC连接器,该连接器首先读取表的快照块,然后继续读取binlog。无论是在快照阶段还是读取binlog阶段,这个连接器都能保证数据的准确读取,即使任务出现故障。
关于本问题的更多回答可点击原文查看:
https://developer.aliyun.com/ask/575856
问题三:这种可以导入到一个flink cdc source table吗?
这边有一个数据库实例,里面几万个表,从官网上了解,好像每一个Flink CDC Table都需要扮演一个mysql slave去抓取数据。我们这个场景是不是Flink CDC不支持?我表a有两个字段是name varchar,age int 然后表b有两个字段aa double,bb int。这种可以导入到一个flink cdc source table?只有两个表有共性,类似分库分表这种场景下,这样的多个相似的表才能导入到一个Flink CDC source table吧。如果我的业务数据库就是有很多不同的业务表,这种场景不知道能不能搞定
参考回答:
Flink CDC支持将多个表的数据导入到一个Flink CDC source table中。你可以使用UNION ALL
操作符将多个表的数据合并成一个结果表,然后将其作为Flink CDC source table的输入。
对于你提到的两个表a和b,它们具有相同的字段类型,因此可以直接将它们合并成一个结果表。假设你已经创建了两个表a和b的Flink SQL查询,可以使用以下代码将它们合并:
SELECT * FROM table_a UNION ALL SELECT * FROM table_b;
这将返回一个包含表a和b所有字段的结果表。你可以将这个结果表作为Flink CDC source table的输入,以便从数据库中捕获数据更改并将其流式传输到Flink应用程序中进行处理。
对于更复杂的分库分表场景,你可以根据业务需求编写相应的SQL查询来合并多个相似的表,并将结果表作为Flink CDC source table的输入。这样,你就可以在Flink应用程序中处理来自不同业务表的数据更改了。
关于本问题的更多回答可点击原文查看:
https://developer.aliyun.com/ask/575852
问题四:Flink CDC中flink sql 多表(大表)同步到单表有什么好方案吗?
Flink CDC中flink sql 多表(大表)同步到单表有什么好方案吗?
参考回答:
在Flink CDC中,将多个大表同步到单个表的方案有以下几种:
- 使用UNION ALL操作符:可以将多个大表的数据通过UNION ALL操作符合并到一个结果表中。这种方式适用于两个或多个大表的结构相同,且需要保留所有数据的情况。
- 使用JOIN操作符:如果多个大表之间存在关联关系,可以使用JOIN操作符将它们连接在一起。这种方式适用于需要根据关联条件将不同大表的数据进行关联的情况。
- 使用子查询:可以在一个SELECT语句中嵌套另一个SELECT语句,将多个大表的数据作为子查询的结果进行聚合或筛选。这种方式适用于需要对多个大表的数据进行复杂处理的情况。
- 使用窗口函数:如果需要对多个大表的数据进行窗口计算,可以使用窗口函数将它们按照时间窗口进行分组。这种方式适用于需要对多个大表的数据进行窗口计算的情况。
- 使用Flink Table API:Flink Table API提供了丰富的API接口,可以方便地实现多个大表到单个表的同步。可以通过定义TableSource、TableSink和TableFunction等接口来实现数据的读取、写入和转换。
关于本问题的更多回答可点击原文查看:
https://developer.aliyun.com/ask/575851
问题五:FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去
FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去,不管是插入的一条数据还是多条数据,间隔的时间也不固定。请问是设置有啥问题吗?代码如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing();
env.getCheckpointConfig().setCheckpointInterval(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setParallelism(1);
EnvironmentSettings Settings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings); String sourceDDL = "CREATE TABLE TS_XX_source (\n" + "ACCT_ID DECIMAL,\n" + " SEQ DECIMAL,\n" + " CC_TYPE DECIMAL,\n" + " CREDIT_LIMIT DECIMAL,\n" + " EFF_DATE TIMESTAMP(3),\n" + " EXP_DATE TIMESTAMP(3),\n" + " SP_ID DECIMAL,\n" + " primary key (ACCT_ID) not enforced\n" + ") WITH (\n" + " 'connector' = 'oracle-cdc',\n" + " 'hostname' = '******',\n" + " 'port' = '******',\n" + " 'username' = '******',\n" + " 'password' = '******',\n" + " 'database-name' = '******',\n" + " 'schema-name' = '******'," + " 'table-name' = '******',\n" + " 'scan.startup.mode' = 'latest-offset'\n" + ")"; String sinkDDL = "CREATE TABLE TS_XX_sink (\n" + "ACCT_ID DECIMAL,\n" + " SEQ DECIMAL,\n" + " CC_TYPE DECIMAL,\n" + " CREDIT_LIMIT DECIMAL,\n" + " EFF_DATE TIMESTAMP(3),\n" + " EXP_DATE TIMESTAMP(3),\n" + " SP_ID DECIMAL,\n" + " primary key (ACCT_ID) not enforced\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" + " 'url' = '******',\n" + " 'username' = '****',\n" + " 'password' = '****',\n" + " 'table-name' = '****'\n" + ")"; String transformDmlSQL = "insert into TS_XX_sink select * from TS_XX_source"; tableEnv.executeSql(sourceDDL).print(); tableEnv.executeSql(sinkDDL).print(); tableEnv.executeSql(transformDmlSQL).print(); env.execute("sync-cdc"); }
参考回答:
从您提供的代码来看,您已经设置了Flink CDC的检查点配置。但是,关于数据同步速度慢的问题,可能与以下因素有关:
- 网络延迟:Oracle数据库和Flink CDC之间的网络连接可能会影响数据同步速度。请确保网络连接稳定且带宽足够。
- Oracle数据库性能:Oracle数据库的性能可能会影响数据同步速度。请检查数据库的负载情况,如CPU、内存和磁盘I/O等。
- Flink CDC配置:您可以尝试调整Flink CDC的配置参数,以优化数据同步速度。例如,增加
checkpointInterval
的值,以便更频繁地触发检查点。同时,可以考虑增加minPauseBetweenCheckpoints
的值,以便在检查点之间有更多的时间间隔。 - Flink集群资源:Flink集群的资源(如CPU、内存和磁盘I/O)也会影响数据同步速度。请确保集群资源充足,以便更好地处理数据同步任务。
- 数据量:如果数据量很大,数据同步速度可能会受到影响。在这种情况下,您可以考虑分批处理数据,或者使用其他优化方法来提高数据同步速度。
关于本问题的更多回答可点击原文查看: