实时计算 Flink版产品使用合集之将多个表的数据导入到一个 source table中如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

问题一:Flink CDC项目组内最近遇到一个flink cdc的技术难点,谁有相关经验的,有空帮忙看看?


Flink CDC项目组内最近遇到一个flink cdc的技术难点,谁有相关经验的,有空帮忙看看?

(1技术实现:oracle:11g/19c,java jdk:17,flink:1.14.2,flink-connector-oracle-cdc(flink-connector-debezium):2.4.1

(2)问题现象:数据同步服务自启动开始,业务上若长时间(如夜间休息时间)没有触发表数据的变更操作(如insert、update、delete),会导致flink cdc捕获到表数据的延迟较大

(3)详细描述:对oracle表的数据进行变更操作(如insert)后,flink cdc捕获到表的insert数据时,需要等待近40分钟:

(3.1)服务启动开始时间:2023-11-29 20:23:35

(3.2)服务运行期间,若短时间内业务有触发表数据的变更操作,flink cdc是可以捕获到表数据的,但之后长时间业务上没有任何的表数据变更操作,一直到时间线1

(3.3)服务运行期间,时间线1:2023-11-30 09:17:47:手动通过insert语句,直接向EBR_CLOUD_GL_HWG.GL001_EBR_PI_TEXT业务表插入了一条记录

(3.4)服务运行期间,时间线2:2023-11-30 09:58:10:flink cdc才捕获到这条insert记录,延迟了近40分钟

(4)正常现象:在不重启此服务的情况下(或者重启服务后),使用相同的数据进行同样的insert操作后,大概在2分钟内,也都可以捕获到此insert记录

(5)服务运行期间,日志无任何报错信息。


参考回答:

根据您提供的信息,这个问题可能是由于Flink CDC在长时间没有数据变更时,会将时间线推进到下一个时间点。当有新的数据插入时,Flink CDC会重新计算时间线,导致捕获延迟增加。

为了解决这个问题,您可以尝试以下方法:

  1. 调整Flink CDC的时间间隔。您可以在Flink CDC的配置中设置一个较短的时间间隔,以便在没有数据变更时,Flink CDC可以更快地更新时间线。例如,您可以将maximum-interval设置为5分钟或更短的时间。
  2. 优化业务逻辑。确保您的业务逻辑可以在不触发数据变更的情况下正常运行。如果可能的话,可以考虑使用缓存或其他技术来减少对数据库的访问。
  3. 考虑使用其他CDC工具。如果您发现Flink CDC的性能问题无法通过上述方法解决,您可以考虑使用其他CDC工具,如Debezium或Kafka Connect。这些工具可能在性能方面表现更好,或者提供了更多的配置选项来满足您的需求。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575857



问题二:Flink CDC是一个作业扮演一个MySQL Slave么?


Flink CDC是一个作业扮演一个MySQL Slave么?


参考回答:

Flink CDC并不是一个作业扮演MySQL Slave的角色。Flink CDC是Flink社区开发的Flink-cdc-connector组件,主要用于从MySQL、PostgreSQL等数据库中直接读取全量数据和增量变更数据。其实现机制通常分为主动查询和事件接收两种模式。在主动查询模式下,通常会在数据源表的某个字段中保存上次更新的时间戳或版本号等信息,然后通过不断的查询与上次的记录进行对比,确定数据是否有变动,从而决定是否需要同步。

对于MySQL来说,Flink提供了专门的CDC连接器,该连接器首先读取表的快照块,然后继续读取binlog。无论是在快照阶段还是读取binlog阶段,这个连接器都能保证数据的准确读取,即使任务出现故障。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575856



问题三:这种可以导入到一个flink cdc source table吗?


这边有一个数据库实例,里面几万个表,从官网上了解,好像每一个Flink CDC Table都需要扮演一个mysql slave去抓取数据。我们这个场景是不是Flink CDC不支持?我表a有两个字段是name varchar,age int 然后表b有两个字段aa double,bb int。这种可以导入到一个flink cdc source table?只有两个表有共性,类似分库分表这种场景下,这样的多个相似的表才能导入到一个Flink CDC source table吧。如果我的业务数据库就是有很多不同的业务表,这种场景不知道能不能搞定


参考回答:

Flink CDC支持将多个表的数据导入到一个Flink CDC source table中。你可以使用UNION ALL操作符将多个表的数据合并成一个结果表,然后将其作为Flink CDC source table的输入。

对于你提到的两个表a和b,它们具有相同的字段类型,因此可以直接将它们合并成一个结果表。假设你已经创建了两个表a和b的Flink SQL查询,可以使用以下代码将它们合并:

SELECT * FROM table_a
UNION ALL
SELECT * FROM table_b;

这将返回一个包含表a和b所有字段的结果表。你可以将这个结果表作为Flink CDC source table的输入,以便从数据库中捕获数据更改并将其流式传输到Flink应用程序中进行处理。

对于更复杂的分库分表场景,你可以根据业务需求编写相应的SQL查询来合并多个相似的表,并将结果表作为Flink CDC source table的输入。这样,你就可以在Flink应用程序中处理来自不同业务表的数据更改了。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575852



问题四:Flink CDC中flink sql 多表(大表)同步到单表有什么好方案吗?


Flink CDC中flink sql 多表(大表)同步到单表有什么好方案吗?


参考回答:

在Flink CDC中,将多个大表同步到单个表的方案有以下几种:

  1. 使用UNION ALL操作符:可以将多个大表的数据通过UNION ALL操作符合并到一个结果表中。这种方式适用于两个或多个大表的结构相同,且需要保留所有数据的情况。
  2. 使用JOIN操作符:如果多个大表之间存在关联关系,可以使用JOIN操作符将它们连接在一起。这种方式适用于需要根据关联条件将不同大表的数据进行关联的情况。
  3. 使用子查询:可以在一个SELECT语句中嵌套另一个SELECT语句,将多个大表的数据作为子查询的结果进行聚合或筛选。这种方式适用于需要对多个大表的数据进行复杂处理的情况。
  4. 使用窗口函数:如果需要对多个大表的数据进行窗口计算,可以使用窗口函数将它们按照时间窗口进行分组。这种方式适用于需要对多个大表的数据进行窗口计算的情况。
  5. 使用Flink Table API:Flink Table API提供了丰富的API接口,可以方便地实现多个大表到单个表的同步。可以通过定义TableSource、TableSink和TableFunction等接口来实现数据的读取、写入和转换。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575851



问题五:FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去


FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去,不管是插入的一条数据还是多条数据,间隔的时间也不固定。请问是设置有啥问题吗?代码如下:

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing();

env.getCheckpointConfig().setCheckpointInterval(2000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);

env.getCheckpointConfig().setCheckpointTimeout(60000);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setParallelism(1);

EnvironmentSettings Settings = EnvironmentSettings.newInstance()
            .inStreamingMode()
            .build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
    String sourceDDL = "CREATE TABLE TS_XX_source (\n" +
                    "ACCT_ID DECIMAL,\n" +
                    " SEQ DECIMAL,\n" +
                    " CC_TYPE DECIMAL,\n" +
                    " CREDIT_LIMIT DECIMAL,\n" +
                    " EFF_DATE TIMESTAMP(3),\n" +
                    " EXP_DATE TIMESTAMP(3),\n" +
                    " SP_ID DECIMAL,\n" +
                    " primary key (ACCT_ID) not enforced\n" +
                    ") WITH (\n" +
                    " 'connector' = 'oracle-cdc',\n" +
                    " 'hostname' = '******',\n" +
                    " 'port' = '******',\n" +
                    " 'username' = '******',\n" +
                    " 'password' = '******',\n" +
                    " 'database-name' = '******',\n" +
                    " 'schema-name' = '******'," +
                    " 'table-name' = '******',\n" +
                    " 'scan.startup.mode' = 'latest-offset'\n" +
                    ")";
    String sinkDDL = "CREATE TABLE TS_XX_sink (\n" +
                    "ACCT_ID DECIMAL,\n" +
                    " SEQ DECIMAL,\n" +
                    " CC_TYPE DECIMAL,\n" +
                    " CREDIT_LIMIT DECIMAL,\n" +
                    " EFF_DATE TIMESTAMP(3),\n" +
                    " EXP_DATE TIMESTAMP(3),\n" +
                    " SP_ID DECIMAL,\n" +
                    " primary key (ACCT_ID) not enforced\n" +
                    ") WITH (\n" +
                    " 'connector' = 'jdbc',\n" +
                    " 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
                    " 'url' = '******',\n" +
                    " 'username' = '****',\n" +
                    " 'password' = '****',\n" +
                    " 'table-name' = '****'\n" +
                    ")";
    String transformDmlSQL =  "insert into TS_XX_sink select * from TS_XX_source";
    tableEnv.executeSql(sourceDDL).print();
    tableEnv.executeSql(sinkDDL).print();
    tableEnv.executeSql(transformDmlSQL).print();
    env.execute("sync-cdc");
}


参考回答:

从您提供的代码来看,您已经设置了Flink CDC的检查点配置。但是,关于数据同步速度慢的问题,可能与以下因素有关:

  1. 网络延迟:Oracle数据库和Flink CDC之间的网络连接可能会影响数据同步速度。请确保网络连接稳定且带宽足够。
  2. Oracle数据库性能:Oracle数据库的性能可能会影响数据同步速度。请检查数据库的负载情况,如CPU、内存和磁盘I/O等。
  3. Flink CDC配置:您可以尝试调整Flink CDC的配置参数,以优化数据同步速度。例如,增加checkpointInterval的值,以便更频繁地触发检查点。同时,可以考虑增加minPauseBetweenCheckpoints的值,以便在检查点之间有更多的时间间隔。
  4. Flink集群资源:Flink集群的资源(如CPU、内存和磁盘I/O)也会影响数据同步速度。请确保集群资源充足,以便更好地处理数据同步任务。
  5. 数据量:如果数据量很大,数据同步速度可能会受到影响。在这种情况下,您可以考虑分批处理数据,或者使用其他优化方法来提高数据同步速度。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575383

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8小时前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8小时前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之CTAS特性只支持新增表,不支持删除表吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22小时前
|
Java 数据处理 Apache
实时计算 Flink版产品使用问题之lookup Join hologres的维表,是否可以指定查bitmap
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22小时前
|
Oracle 关系型数据库 Java
实时计算 Flink版产品使用问题之如何实现Oracle到其他系统的实时同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22小时前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之部署完毕后,启动了一直看不到slot的个数,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22小时前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用问题之是否支持tdsql
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22小时前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之oracle无主键的表支持同步吗如何实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22小时前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22小时前
|
缓存 NoSQL 数据处理
实时计算 Flink版产品使用问题之读取数据太慢该如何优化
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
22小时前
|
Oracle Java 关系型数据库
实时计算 Flink版产品使用问题之Metaspace不自动回收是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

相关产品

  • 实时计算 Flink版