实时计算 Flink版产品使用合集之将多个表的数据导入到一个 source table中如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

问题一:Flink CDC项目组内最近遇到一个flink cdc的技术难点,谁有相关经验的,有空帮忙看看?


Flink CDC项目组内最近遇到一个flink cdc的技术难点,谁有相关经验的,有空帮忙看看?

(1技术实现:oracle:11g/19c,java jdk:17,flink:1.14.2,flink-connector-oracle-cdc(flink-connector-debezium):2.4.1

(2)问题现象:数据同步服务自启动开始,业务上若长时间(如夜间休息时间)没有触发表数据的变更操作(如insert、update、delete),会导致flink cdc捕获到表数据的延迟较大

(3)详细描述:对oracle表的数据进行变更操作(如insert)后,flink cdc捕获到表的insert数据时,需要等待近40分钟:

(3.1)服务启动开始时间:2023-11-29 20:23:35

(3.2)服务运行期间,若短时间内业务有触发表数据的变更操作,flink cdc是可以捕获到表数据的,但之后长时间业务上没有任何的表数据变更操作,一直到时间线1

(3.3)服务运行期间,时间线1:2023-11-30 09:17:47:手动通过insert语句,直接向EBR_CLOUD_GL_HWG.GL001_EBR_PI_TEXT业务表插入了一条记录

(3.4)服务运行期间,时间线2:2023-11-30 09:58:10:flink cdc才捕获到这条insert记录,延迟了近40分钟

(4)正常现象:在不重启此服务的情况下(或者重启服务后),使用相同的数据进行同样的insert操作后,大概在2分钟内,也都可以捕获到此insert记录

(5)服务运行期间,日志无任何报错信息。


参考回答:

根据您提供的信息,这个问题可能是由于Flink CDC在长时间没有数据变更时,会将时间线推进到下一个时间点。当有新的数据插入时,Flink CDC会重新计算时间线,导致捕获延迟增加。

为了解决这个问题,您可以尝试以下方法:

  1. 调整Flink CDC的时间间隔。您可以在Flink CDC的配置中设置一个较短的时间间隔,以便在没有数据变更时,Flink CDC可以更快地更新时间线。例如,您可以将maximum-interval设置为5分钟或更短的时间。
  2. 优化业务逻辑。确保您的业务逻辑可以在不触发数据变更的情况下正常运行。如果可能的话,可以考虑使用缓存或其他技术来减少对数据库的访问。
  3. 考虑使用其他CDC工具。如果您发现Flink CDC的性能问题无法通过上述方法解决,您可以考虑使用其他CDC工具,如Debezium或Kafka Connect。这些工具可能在性能方面表现更好,或者提供了更多的配置选项来满足您的需求。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575857



问题二:Flink CDC是一个作业扮演一个MySQL Slave么?


Flink CDC是一个作业扮演一个MySQL Slave么?


参考回答:

Flink CDC并不是一个作业扮演MySQL Slave的角色。Flink CDC是Flink社区开发的Flink-cdc-connector组件,主要用于从MySQL、PostgreSQL等数据库中直接读取全量数据和增量变更数据。其实现机制通常分为主动查询和事件接收两种模式。在主动查询模式下,通常会在数据源表的某个字段中保存上次更新的时间戳或版本号等信息,然后通过不断的查询与上次的记录进行对比,确定数据是否有变动,从而决定是否需要同步。

对于MySQL来说,Flink提供了专门的CDC连接器,该连接器首先读取表的快照块,然后继续读取binlog。无论是在快照阶段还是读取binlog阶段,这个连接器都能保证数据的准确读取,即使任务出现故障。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575856



问题三:这种可以导入到一个flink cdc source table吗?


这边有一个数据库实例,里面几万个表,从官网上了解,好像每一个Flink CDC Table都需要扮演一个mysql slave去抓取数据。我们这个场景是不是Flink CDC不支持?我表a有两个字段是name varchar,age int 然后表b有两个字段aa double,bb int。这种可以导入到一个flink cdc source table?只有两个表有共性,类似分库分表这种场景下,这样的多个相似的表才能导入到一个Flink CDC source table吧。如果我的业务数据库就是有很多不同的业务表,这种场景不知道能不能搞定


参考回答:

Flink CDC支持将多个表的数据导入到一个Flink CDC source table中。你可以使用UNION ALL操作符将多个表的数据合并成一个结果表,然后将其作为Flink CDC source table的输入。

对于你提到的两个表a和b,它们具有相同的字段类型,因此可以直接将它们合并成一个结果表。假设你已经创建了两个表a和b的Flink SQL查询,可以使用以下代码将它们合并:

SELECT * FROM table_a
UNION ALL
SELECT * FROM table_b;

这将返回一个包含表a和b所有字段的结果表。你可以将这个结果表作为Flink CDC source table的输入,以便从数据库中捕获数据更改并将其流式传输到Flink应用程序中进行处理。

对于更复杂的分库分表场景,你可以根据业务需求编写相应的SQL查询来合并多个相似的表,并将结果表作为Flink CDC source table的输入。这样,你就可以在Flink应用程序中处理来自不同业务表的数据更改了。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575852



问题四:Flink CDC中flink sql 多表(大表)同步到单表有什么好方案吗?


Flink CDC中flink sql 多表(大表)同步到单表有什么好方案吗?


参考回答:

在Flink CDC中,将多个大表同步到单个表的方案有以下几种:

  1. 使用UNION ALL操作符:可以将多个大表的数据通过UNION ALL操作符合并到一个结果表中。这种方式适用于两个或多个大表的结构相同,且需要保留所有数据的情况。
  2. 使用JOIN操作符:如果多个大表之间存在关联关系,可以使用JOIN操作符将它们连接在一起。这种方式适用于需要根据关联条件将不同大表的数据进行关联的情况。
  3. 使用子查询:可以在一个SELECT语句中嵌套另一个SELECT语句,将多个大表的数据作为子查询的结果进行聚合或筛选。这种方式适用于需要对多个大表的数据进行复杂处理的情况。
  4. 使用窗口函数:如果需要对多个大表的数据进行窗口计算,可以使用窗口函数将它们按照时间窗口进行分组。这种方式适用于需要对多个大表的数据进行窗口计算的情况。
  5. 使用Flink Table API:Flink Table API提供了丰富的API接口,可以方便地实现多个大表到单个表的同步。可以通过定义TableSource、TableSink和TableFunction等接口来实现数据的读取、写入和转换。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575851



问题五:FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去


FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去,不管是插入的一条数据还是多条数据,间隔的时间也不固定。请问是设置有啥问题吗?代码如下:

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing();

env.getCheckpointConfig().setCheckpointInterval(2000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);

env.getCheckpointConfig().setCheckpointTimeout(60000);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setParallelism(1);

EnvironmentSettings Settings = EnvironmentSettings.newInstance()
            .inStreamingMode()
            .build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
    String sourceDDL = "CREATE TABLE TS_XX_source (\n" +
                    "ACCT_ID DECIMAL,\n" +
                    " SEQ DECIMAL,\n" +
                    " CC_TYPE DECIMAL,\n" +
                    " CREDIT_LIMIT DECIMAL,\n" +
                    " EFF_DATE TIMESTAMP(3),\n" +
                    " EXP_DATE TIMESTAMP(3),\n" +
                    " SP_ID DECIMAL,\n" +
                    " primary key (ACCT_ID) not enforced\n" +
                    ") WITH (\n" +
                    " 'connector' = 'oracle-cdc',\n" +
                    " 'hostname' = '******',\n" +
                    " 'port' = '******',\n" +
                    " 'username' = '******',\n" +
                    " 'password' = '******',\n" +
                    " 'database-name' = '******',\n" +
                    " 'schema-name' = '******'," +
                    " 'table-name' = '******',\n" +
                    " 'scan.startup.mode' = 'latest-offset'\n" +
                    ")";
    String sinkDDL = "CREATE TABLE TS_XX_sink (\n" +
                    "ACCT_ID DECIMAL,\n" +
                    " SEQ DECIMAL,\n" +
                    " CC_TYPE DECIMAL,\n" +
                    " CREDIT_LIMIT DECIMAL,\n" +
                    " EFF_DATE TIMESTAMP(3),\n" +
                    " EXP_DATE TIMESTAMP(3),\n" +
                    " SP_ID DECIMAL,\n" +
                    " primary key (ACCT_ID) not enforced\n" +
                    ") WITH (\n" +
                    " 'connector' = 'jdbc',\n" +
                    " 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
                    " 'url' = '******',\n" +
                    " 'username' = '****',\n" +
                    " 'password' = '****',\n" +
                    " 'table-name' = '****'\n" +
                    ")";
    String transformDmlSQL =  "insert into TS_XX_sink select * from TS_XX_source";
    tableEnv.executeSql(sourceDDL).print();
    tableEnv.executeSql(sinkDDL).print();
    tableEnv.executeSql(transformDmlSQL).print();
    env.execute("sync-cdc");
}


参考回答:

从您提供的代码来看,您已经设置了Flink CDC的检查点配置。但是,关于数据同步速度慢的问题,可能与以下因素有关:

  1. 网络延迟:Oracle数据库和Flink CDC之间的网络连接可能会影响数据同步速度。请确保网络连接稳定且带宽足够。
  2. Oracle数据库性能:Oracle数据库的性能可能会影响数据同步速度。请检查数据库的负载情况,如CPU、内存和磁盘I/O等。
  3. Flink CDC配置:您可以尝试调整Flink CDC的配置参数,以优化数据同步速度。例如,增加checkpointInterval的值,以便更频繁地触发检查点。同时,可以考虑增加minPauseBetweenCheckpoints的值,以便在检查点之间有更多的时间间隔。
  4. Flink集群资源:Flink集群的资源(如CPU、内存和磁盘I/O)也会影响数据同步速度。请确保集群资源充足,以便更好地处理数据同步任务。
  5. 数据量:如果数据量很大,数据同步速度可能会受到影响。在这种情况下,您可以考虑分批处理数据,或者使用其他优化方法来提高数据同步速度。


关于本问题的更多回答可点击原文查看:

https://developer.aliyun.com/ask/575383

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
23天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
808 17
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
20天前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。
zdl
|
12天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
44 0
|
1月前
|
数据可视化 大数据 数据处理
评测报告:实时计算Flink版产品体验
实时计算Flink版提供了丰富的文档和产品引导,帮助初学者快速上手。其强大的实时数据处理能力和多数据源支持,满足了大部分业务需求。但在高级功能、性能优化和用户界面方面仍有改进空间。建议增加更多自定义处理函数、数据可视化工具,并优化用户界面,增强社区互动,以提升整体用户体验和竞争力。
40 2
|
1月前
|
运维 搜索推荐 数据安全/隐私保护
阿里云实时计算Flink版测评报告
阿里云实时计算Flink版在用户行为分析与标签画像场景中表现出色,通过实时处理电商平台用户行为数据,生成用户兴趣偏好和标签,提升推荐系统效率。该服务具备高稳定性、低延迟、高吞吐量,支持按需计费,显著降低运维成本,提高开发效率。
70 1
|
1月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
1月前
|
运维 监控 Serverless
阿里云实时计算Flink版评测报告
阿里云实时计算Flink版是一款全托管的Serverless实时流处理服务,基于Apache Flink构建,提供企业级增值功能。本文从稳定性、性能、开发运维、安全性和成本效益等方面全面评测该产品,展示其在实时数据处理中的卓越表现和高投资回报率。
|
1月前
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
44 0
|
SQL 存储 运维
如何降低 Flink 开发和运维成本?阿里云实时计算平台建设实践
本次分享主要介绍阿里云实时计算平台从 2.0 基于 Yarn 的架构到 3.0 云原生时代的演进,以及在 3.0 平台上一些核心功能的建设实践,如健康分,智能诊断,细粒度资源,作业探查以及企业级安全的建设等。
如何降低 Flink 开发和运维成本?阿里云实时计算平台建设实践
|
存储 SQL 分布式计算
《Apache Flink 案例集(2022版)》——2.数据分析——汽车之家-Flink 的实时计算平台 3.0 建设实践
《Apache Flink 案例集(2022版)》——2.数据分析——汽车之家-Flink 的实时计算平台 3.0 建设实践
266 0

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面