实时计算 Flink版产品使用合集之从MySQL到Flink 1.16.2 Flink-SQL的数据同步工作出现了一个异常如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

问题一:Flink这些完成的任务怎么删除呢?


Flink这些完成的任务怎么删除呢?


参考回答:

在Flink中,一旦任务完成,它就会被标记为已完成,并存储在任务历史中。这些已完成的任务不会被自动删除,因为它们可以被用于故障恢复、审计等用途。

如果你想手动删除这些已完成的任务,你需要在Flink的Web UI中操作。以下是步骤:

  1. 打开Flink的Web UI,找到你想要删除的任务。
  2. 在任务详情页面,点击“更多”按钮,然后选择“删除任务”。
  3. 在弹出的对话框中,确认你的选择,然后点击“确定”。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570350


问题二:使用flink cdc 实时同步oracle 到 doris,必须开启数据库的增量日志设置吗?


目前在学习 flink cdc 实时同步技术,在 mysql 中测试通过。在 oracle 测试过程中,发现只能同步oracle 的表结构,数据无法同步到 doris,也看不到相关的错误日志。

查看相关问题,文章说必须开启 oralce的supplemental_log_data_min,请问是否这个原因?公司环境的的 oracle 确实只开启了归档日志,supplemental_log_data_min确实没有开启。

另外如果开启,是否开启数据库级别的就可以。

另外有个疑问,没有开启supplemental_log_data_min的情况下,使用 ogg 同步是不是也可能会出现同步的数据偏差。


参考回答:

是的,Oracle的CDC(Change Data Capture)功能需要Supplemental Logging和Archive Logging同时开启才能正常工作。如果你的Oracle实例只开启了Archive Logging,那么Flink CDC可能只能捕获到表的DDL变更,而无法捕获到实际的DML变更。

关于Supplemental Logging,它需要在表级别开启,而不是在数据库级别。你可以通过以下SQL语句来检查某个表是否已经开启了Supplemental Logging:

SELECT * FROM cat
WHERE table_name = 'your_table_name'
AND log_mode = 'SUPPLEMENTAL';

如果返回结果为空,那么说明该表未开启Supplemental Logging。

至于OGG,它同样需要Supplemental Logging和Archive Logging同时开启才能正常工作。如果没有开启Supplemental Logging,OGG可能也无法正确捕获到数据的变更。

总的来说,无论是Flink CDC还是OGG,都需要Oracle的Supplemental Logging和Archive Logging同时开启才能正常工作。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570254


问题三:Flink CDC数据已经同步,但flink 作业还有异常?


Flink CDC从 mysql 8 到 flink 1.16.2 的 flink-sql 里,数据已经同步,但flink 作业还有异常。java.lang.RuntimeException: One or more fetchers have encountered exception

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)

at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)

at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)

at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)

at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)

at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

... 1 more

Caused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=test.orders, splitId='test.orders:0', splitKeyType=[order_id INT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} error due to org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured.

at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:306)

at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:253)

at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:76)

at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)

... 6 more

Caused by: io.debezium.DebeziumException: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured

at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:120)

at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambdasubmitSplitsubmitSplitsubmitSplit0(SnapshotSplitReader.java:130)

... 5 more

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured

at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset(DebeziumUtils.java:130)

at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:143)

at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:115)

... 6 more?


参考回答:

这个异常是由于Flink无法通过'SHOW MASTER STATUS'读取binlog文件名和位置。请确保您的MySQL服务器已正确配置。您可以尝试以下方法来解决这个问题:

  1. 确保MySQL服务器已启用二进制日志(binlog)。可以通过以下命令检查:
SHOW VARIABLES LIKE 'log_bin';

如果结果为OFF,请尝试将其设置为ON:

SET GLOBAL log_bin = ON;
  1. 确保MySQL用户具有足够的权限来访问二进制日志。可以通过以下命令授权:
GRANT REPLICATION SLAVE ON *.* TO 'your_mysql_user'@'your_mysql_host';
  1. 在Flink SQL中,确保使用正确的用户名、密码和主机名连接到MySQL源。例如:
CREATE TABLE your_table (
    ...
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'your_mysql_host',
    'port' = 'your_mysql_port',
    'username' = 'your_mysql_user',
    'password' = 'your_mysql_password',
    ...
);
  1. 如果问题仍然存在,请检查MySQL服务器的配置文件(如my.cnf或my.ini),确保log_bin设置正确,并重启MySQL服务器。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570246


问题四:Flink CDC有没有比较完整的学习文档?


Flink CDC有没有比较完整的学习文档?


参考回答:

Flink CDC 的学习文档相对丰富,你可以从多个途径获取学习资料。首先,你需要确保已经安装了 Apache Flink,并且具备基本的 Flink 知识和技能。然后,可以到 Flink 官方网站或者 GitHub 上下载并安装最新版本的 FlinkCDC 插件。

在了解基础操作后,你可以查看一些更深入的教程和介绍。例如"快速上手"部分将引导你基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL;而"深入讲解了最新发布的 Flink CDC 2.0.0 版本带来的核心特性"则可以让你了解到 Flink CDC 2.0.0 的主要新功能和改进。此外,“Flink CDC 概念、应用场景、数据同步,用于备份,容灾”等内容可以帮助你理解 Flink CDC 在实际工作中的使用场景。

同时,你也可以查阅相关视频教程和博客文章来进一步加深理解。最后,实践是最好的老师,你可以尝试自己搭建环境并实际操作一下,这将有助于你更好地理解和掌握 Flink CDC。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570243


问题五:Flink CDC会出现分配和之前任务相同的是吗?


Flink CDC任务指定了server-id后面在启动别的cdc任务但是没有指定server-id可能会导致新的任务去随机分配server-id的时候,会出现分配和之前任务相同的是吗?


参考回答:

别写5400-6400之间的就行,建议是必须指定serverid


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570242

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2天前
|
分布式计算 DataWorks 监控
MaxCompute产品使用问题之如何将MaxCompute中的数据同步到ClickHouse的分区表中
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
9天前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
295 0
|
10天前
|
SQL 监控 Java
实时计算 Flink版产品使用问题之在进行数据同步时,修改了YAML文件以增加新的同步表并取消了之前的同步任务,如何从之前的检查点继续执行
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
15天前
|
运维 数据管理 数据库
数据管理DMS产品使用合集之如何实现跨库的数据同步
阿里云数据管理DMS提供了全面的数据管理、数据库运维、数据安全、数据迁移与同步等功能,助力企业高效、安全地进行数据库管理和运维工作。以下是DMS产品使用合集的详细介绍。
|
15天前
|
JSON 分布式计算 DataWorks
MaxCompute产品使用合集之如何将JSON格式数据同步到MongoDB
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 关系型数据库 MySQL
关系型数据库MySQL的MyISAM
【6月更文挑战第17天】
20 11
|
1天前
|
SQL 关系型数据库 MySQL
关系型数据库mysql的CSV
【6月更文挑战第18天】
18 6
|
3天前
|
存储 关系型数据库 MySQL
|
3天前
|
SQL 关系型数据库 MySQL
|
1天前
|
存储 监控 关系型数据库
关系型数据库mysql的BLACKHOLE
【6月更文挑战第18天】
17 4

热门文章

最新文章

相关产品

  • 实时计算 Flink版