实时计算 Flink版产品使用合集之将多个表的数据导入到一个 source table中如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

问题一:Flink CDC项目组内最近遇到一个flink cdc的技术难点,谁有相关经验的,有空帮忙看看?


Flink CDC项目组内最近遇到一个flink cdc的技术难点,谁有相关经验的,有空帮忙看看?

(1技术实现:oracle:11g/19c,java jdk:17,flink:1.14.2,flink-connector-oracle-cdc(flink-connector-debezium):2.4.1

(2)问题现象:数据同步服务自启动开始,业务上若长时间(如夜间休息时间)没有触发表数据的变更操作(如insert、update、delete),会导致flink cdc捕获到表数据的延迟较大

(3)详细描述:对oracle表的数据进行变更操作(如insert)后,flink cdc捕获到表的insert数据时,需要等待近40分钟:

(3.1)服务启动开始时间:2023-11-29 20:23:35

(3.2)服务运行期间,若短时间内业务有触发表数据的变更操作,flink cdc是可以捕获到表数据的,但之后长时间业务上没有任何的表数据变更操作,一直到时间线1

(3.3)服务运行期间,时间线1:2023-11-30 09:17:47:手动通过insert语句,直接向EBR_CLOUD_GL_HWG.GL001_EBR_PI_TEXT业务表插入了一条记录

(3.4)服务运行期间,时间线2:2023-11-30 09:58:10:flink cdc才捕获到这条insert记录,延迟了近40分钟

(4)正常现象:在不重启此服务的情况下(或者重启服务后),使用相同的数据进行同样的insert操作后,大概在2分钟内,也都可以捕获到此insert记录

(5)服务运行期间,日志无任何报错信息。


参考回答:

根据您提供的信息,这个问题可能是由于Flink CDC在长时间没有数据变更时,会将时间线推进到下一个时间点。当有新的数据插入时,Flink CDC会重新计算时间线,导致捕获延迟增加。

为了解决这个问题,您可以尝试以下方法:

  1. 调整Flink CDC的时间间隔。您可以在Flink CDC的配置中设置一个较短的时间间隔,以便在没有数据变更时,Flink CDC可以更快地更新时间线。例如,您可以将maximum-interval设置为5分钟或更短的时间。
  2. 优化业务逻辑。确保您的业务逻辑可以在不触发数据变更的情况下正常运行。如果可能的话,可以考虑使用缓存或其他技术来减少对数据库的访问。
  3. 考虑使用其他CDC工具。如果您发现Flink CDC的性能问题无法通过上述方法解决,您可以考虑使用其他CDC工具,如Debezium或Kafka Connect。这些工具可能在性能方面表现更好,或者提供了更多的配置选项来满足您的需求。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575857



问题二:Flink CDC是一个作业扮演一个MySQL Slave么?


Flink CDC是一个作业扮演一个MySQL Slave么?


参考回答:

Flink CDC并不是一个作业扮演MySQL Slave的角色。Flink CDC是Flink社区开发的Flink-cdc-connector组件,主要用于从MySQL、PostgreSQL等数据库中直接读取全量数据和增量变更数据。其实现机制通常分为主动查询和事件接收两种模式。在主动查询模式下,通常会在数据源表的某个字段中保存上次更新的时间戳或版本号等信息,然后通过不断的查询与上次的记录进行对比,确定数据是否有变动,从而决定是否需要同步。

对于MySQL来说,Flink提供了专门的CDC连接器,该连接器首先读取表的快照块,然后继续读取binlog。无论是在快照阶段还是读取binlog阶段,这个连接器都能保证数据的准确读取,即使任务出现故障。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575856



问题三:这种可以导入到一个flink cdc source table吗?


这边有一个数据库实例,里面几万个表,从官网上了解,好像每一个Flink CDC Table都需要扮演一个mysql slave去抓取数据。我们这个场景是不是Flink CDC不支持?我表a有两个字段是name varchar,age int 然后表b有两个字段aa double,bb int。这种可以导入到一个flink cdc source table?只有两个表有共性,类似分库分表这种场景下,这样的多个相似的表才能导入到一个Flink CDC source table吧。如果我的业务数据库就是有很多不同的业务表,这种场景不知道能不能搞定


参考回答:

Flink CDC支持将多个表的数据导入到一个Flink CDC source table中。你可以使用UNION ALL操作符将多个表的数据合并成一个结果表,然后将其作为Flink CDC source table的输入。

对于你提到的两个表a和b,它们具有相同的字段类型,因此可以直接将它们合并成一个结果表。假设你已经创建了两个表a和b的Flink SQL查询,可以使用以下代码将它们合并:

SELECT * FROM table_a
UNION ALL
SELECT * FROM table_b;

这将返回一个包含表a和b所有字段的结果表。你可以将这个结果表作为Flink CDC source table的输入,以便从数据库中捕获数据更改并将其流式传输到Flink应用程序中进行处理。

对于更复杂的分库分表场景,你可以根据业务需求编写相应的SQL查询来合并多个相似的表,并将结果表作为Flink CDC source table的输入。这样,你就可以在Flink应用程序中处理来自不同业务表的数据更改了。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575852



问题四:Flink CDC中flink sql 多表(大表)同步到单表有什么好方案吗?


Flink CDC中flink sql 多表(大表)同步到单表有什么好方案吗?


参考回答:

在Flink CDC中,将多个大表同步到单个表的方案有以下几种:

  1. 使用UNION ALL操作符:可以将多个大表的数据通过UNION ALL操作符合并到一个结果表中。这种方式适用于两个或多个大表的结构相同,且需要保留所有数据的情况。
  2. 使用JOIN操作符:如果多个大表之间存在关联关系,可以使用JOIN操作符将它们连接在一起。这种方式适用于需要根据关联条件将不同大表的数据进行关联的情况。
  3. 使用子查询:可以在一个SELECT语句中嵌套另一个SELECT语句,将多个大表的数据作为子查询的结果进行聚合或筛选。这种方式适用于需要对多个大表的数据进行复杂处理的情况。
  4. 使用窗口函数:如果需要对多个大表的数据进行窗口计算,可以使用窗口函数将它们按照时间窗口进行分组。这种方式适用于需要对多个大表的数据进行窗口计算的情况。
  5. 使用Flink Table API:Flink Table API提供了丰富的API接口,可以方便地实现多个大表到单个表的同步。可以通过定义TableSource、TableSink和TableFunction等接口来实现数据的读取、写入和转换。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575851



问题五:FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去


FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去,不管是插入的一条数据还是多条数据,间隔的时间也不固定。请问是设置有啥问题吗?代码如下:

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing();

env.getCheckpointConfig().setCheckpointInterval(2000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);

env.getCheckpointConfig().setCheckpointTimeout(60000);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setParallelism(1);

EnvironmentSettings Settings = EnvironmentSettings.newInstance()
            .inStreamingMode()
            .build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
    String sourceDDL = "CREATE TABLE TS_XX_source (\n" +
                    "ACCT_ID DECIMAL,\n" +
                    " SEQ DECIMAL,\n" +
                    " CC_TYPE DECIMAL,\n" +
                    " CREDIT_LIMIT DECIMAL,\n" +
                    " EFF_DATE TIMESTAMP(3),\n" +
                    " EXP_DATE TIMESTAMP(3),\n" +
                    " SP_ID DECIMAL,\n" +
                    " primary key (ACCT_ID) not enforced\n" +
                    ") WITH (\n" +
                    " 'connector' = 'oracle-cdc',\n" +
                    " 'hostname' = '******',\n" +
                    " 'port' = '******',\n" +
                    " 'username' = '******',\n" +
                    " 'password' = '******',\n" +
                    " 'database-name' = '******',\n" +
                    " 'schema-name' = '******'," +
                    " 'table-name' = '******',\n" +
                    " 'scan.startup.mode' = 'latest-offset'\n" +
                    ")";
    String sinkDDL = "CREATE TABLE TS_XX_sink (\n" +
                    "ACCT_ID DECIMAL,\n" +
                    " SEQ DECIMAL,\n" +
                    " CC_TYPE DECIMAL,\n" +
                    " CREDIT_LIMIT DECIMAL,\n" +
                    " EFF_DATE TIMESTAMP(3),\n" +
                    " EXP_DATE TIMESTAMP(3),\n" +
                    " SP_ID DECIMAL,\n" +
                    " primary key (ACCT_ID) not enforced\n" +
                    ") WITH (\n" +
                    " 'connector' = 'jdbc',\n" +
                    " 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
                    " 'url' = '******',\n" +
                    " 'username' = '****',\n" +
                    " 'password' = '****',\n" +
                    " 'table-name' = '****'\n" +
                    ")";
    String transformDmlSQL =  "insert into TS_XX_sink select * from TS_XX_source";
    tableEnv.executeSql(sourceDDL).print();
    tableEnv.executeSql(sinkDDL).print();
    tableEnv.executeSql(transformDmlSQL).print();
    env.execute("sync-cdc");
}


参考回答:

从您提供的代码来看,您已经设置了Flink CDC的检查点配置。但是,关于数据同步速度慢的问题,可能与以下因素有关:

  1. 网络延迟:Oracle数据库和Flink CDC之间的网络连接可能会影响数据同步速度。请确保网络连接稳定且带宽足够。
  2. Oracle数据库性能:Oracle数据库的性能可能会影响数据同步速度。请检查数据库的负载情况,如CPU、内存和磁盘I/O等。
  3. Flink CDC配置:您可以尝试调整Flink CDC的配置参数,以优化数据同步速度。例如,增加checkpointInterval的值,以便更频繁地触发检查点。同时,可以考虑增加minPauseBetweenCheckpoints的值,以便在检查点之间有更多的时间间隔。
  4. Flink集群资源:Flink集群的资源(如CPU、内存和磁盘I/O)也会影响数据同步速度。请确保集群资源充足,以便更好地处理数据同步任务。
  5. 数据量:如果数据量很大,数据同步速度可能会受到影响。在这种情况下,您可以考虑分批处理数据,或者使用其他优化方法来提高数据同步速度。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575383

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
11天前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
12天前
|
存储 运维 监控
阿里云实时计算Flink版的评测
阿里云实时计算Flink版的评测
43 15
|
11天前
|
运维 分布式计算 监控
评测报告:阿里云实时计算Flink版
本评测主要针对阿里云实时计算Flink版在用户行为分析中的应用。作为一名数据分析师,我利用该服务处理了大量日志数据,包括用户点击流和登录行为。Flink的强大实时处理能力让我能够迅速洞察用户行为变化,及时调整营销策略。此外,其卓越的性能和稳定性显著降低了运维负担,提升了项目效率。产品文档详尽且易于理解,但建议增加故障排查示例。
|
10天前
|
机器学习/深度学习 运维 监控
阿里云实时计算Flink版体验评测
阿里云实时计算Flink版提供了完善的产品内引导和丰富文档,使初学者也能快速上手。产品界面引导清晰,内置模板简化了流处理任务。官方文档全面,涵盖配置、开发、调优等内容。此外,该产品在数据开发和运维方面表现优秀,支持灵活的作业开发和自动化运维。未来可增强复杂事件处理、实时可视化展示及机器学习支持,进一步提升用户体验。作为阿里云大数据体系的一部分,它能与DataWorks、MaxCompute等产品无缝联动,构建完整的实时数据处理平台。
|
2月前
|
存储 SQL 关系型数据库
实时计算 Flink版产品使用问题之如何高效地将各分片存储并跟踪每个分片的消费位置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版产品使用问题之如何处理数据并记录每条数据的变更
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
资源调度 Java Scala
实时计算 Flink版产品使用问题之如何实现ZooKeeper抖动导致任务失败时,能从最近的检查点重新启动任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
731 7
阿里云实时计算Flink在多行业的应用和实践
|
2月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面