实时计算 Flink版产品使用合集之从MySQL到Flink 1.16.2 Flink-SQL的数据同步工作出现了一个异常如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

问题一:Flink这些完成的任务怎么删除呢?


Flink这些完成的任务怎么删除呢?


参考回答:

在Flink中,一旦任务完成,它就会被标记为已完成,并存储在任务历史中。这些已完成的任务不会被自动删除,因为它们可以被用于故障恢复、审计等用途。

如果你想手动删除这些已完成的任务,你需要在Flink的Web UI中操作。以下是步骤:

  1. 打开Flink的Web UI,找到你想要删除的任务。
  2. 在任务详情页面,点击“更多”按钮,然后选择“删除任务”。
  3. 在弹出的对话框中,确认你的选择,然后点击“确定”。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570350


问题二:使用flink cdc 实时同步oracle 到 doris,必须开启数据库的增量日志设置吗?


目前在学习 flink cdc 实时同步技术,在 mysql 中测试通过。在 oracle 测试过程中,发现只能同步oracle 的表结构,数据无法同步到 doris,也看不到相关的错误日志。

查看相关问题,文章说必须开启 oralce的supplemental_log_data_min,请问是否这个原因?公司环境的的 oracle 确实只开启了归档日志,supplemental_log_data_min确实没有开启。

另外如果开启,是否开启数据库级别的就可以。

另外有个疑问,没有开启supplemental_log_data_min的情况下,使用 ogg 同步是不是也可能会出现同步的数据偏差。


参考回答:

是的,Oracle的CDC(Change Data Capture)功能需要Supplemental Logging和Archive Logging同时开启才能正常工作。如果你的Oracle实例只开启了Archive Logging,那么Flink CDC可能只能捕获到表的DDL变更,而无法捕获到实际的DML变更。

关于Supplemental Logging,它需要在表级别开启,而不是在数据库级别。你可以通过以下SQL语句来检查某个表是否已经开启了Supplemental Logging:

SELECT * FROM cat
WHERE table_name = 'your_table_name'
AND log_mode = 'SUPPLEMENTAL';

如果返回结果为空,那么说明该表未开启Supplemental Logging。

至于OGG,它同样需要Supplemental Logging和Archive Logging同时开启才能正常工作。如果没有开启Supplemental Logging,OGG可能也无法正确捕获到数据的变更。

总的来说,无论是Flink CDC还是OGG,都需要Oracle的Supplemental Logging和Archive Logging同时开启才能正常工作。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570254


问题三:Flink CDC数据已经同步,但flink 作业还有异常?


Flink CDC从 mysql 8 到 flink 1.16.2 的 flink-sql 里,数据已经同步,但flink 作业还有异常。java.lang.RuntimeException: One or more fetchers have encountered exception

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)

at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)

at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)

at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)

at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)

at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

... 1 more

Caused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=test.orders, splitId='test.orders:0', splitKeyType=[order_id INT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} error due to org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured.

at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:306)

at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:253)

at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:76)

at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)

at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)

... 6 more

Caused by: io.debezium.DebeziumException: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured

at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:120)

at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambdasubmitSplitsubmitSplitsubmitSplit0(SnapshotSplitReader.java:130)

... 5 more

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured

at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset(DebeziumUtils.java:130)

at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:143)

at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:115)

... 6 more?


参考回答:

这个异常是由于Flink无法通过'SHOW MASTER STATUS'读取binlog文件名和位置。请确保您的MySQL服务器已正确配置。您可以尝试以下方法来解决这个问题:

  1. 确保MySQL服务器已启用二进制日志(binlog)。可以通过以下命令检查:
SHOW VARIABLES LIKE 'log_bin';

如果结果为OFF,请尝试将其设置为ON:

SET GLOBAL log_bin = ON;
  1. 确保MySQL用户具有足够的权限来访问二进制日志。可以通过以下命令授权:
GRANT REPLICATION SLAVE ON *.* TO 'your_mysql_user'@'your_mysql_host';
  1. 在Flink SQL中,确保使用正确的用户名、密码和主机名连接到MySQL源。例如:
CREATE TABLE your_table (
    ...
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'your_mysql_host',
    'port' = 'your_mysql_port',
    'username' = 'your_mysql_user',
    'password' = 'your_mysql_password',
    ...
);
  1. 如果问题仍然存在,请检查MySQL服务器的配置文件(如my.cnf或my.ini),确保log_bin设置正确,并重启MySQL服务器。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570246


问题四:Flink CDC有没有比较完整的学习文档?


Flink CDC有没有比较完整的学习文档?


参考回答:

Flink CDC 的学习文档相对丰富,你可以从多个途径获取学习资料。首先,你需要确保已经安装了 Apache Flink,并且具备基本的 Flink 知识和技能。然后,可以到 Flink 官方网站或者 GitHub 上下载并安装最新版本的 FlinkCDC 插件。

在了解基础操作后,你可以查看一些更深入的教程和介绍。例如"快速上手"部分将引导你基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL;而"深入讲解了最新发布的 Flink CDC 2.0.0 版本带来的核心特性"则可以让你了解到 Flink CDC 2.0.0 的主要新功能和改进。此外,“Flink CDC 概念、应用场景、数据同步,用于备份,容灾”等内容可以帮助你理解 Flink CDC 在实际工作中的使用场景。

同时,你也可以查阅相关视频教程和博客文章来进一步加深理解。最后,实践是最好的老师,你可以尝试自己搭建环境并实际操作一下,这将有助于你更好地理解和掌握 Flink CDC。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570243


问题五:Flink CDC会出现分配和之前任务相同的是吗?


Flink CDC任务指定了server-id后面在启动别的cdc任务但是没有指定server-id可能会导致新的任务去随机分配server-id的时候,会出现分配和之前任务相同的是吗?


参考回答:

别写5400-6400之间的就行,建议是必须指定serverid


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/570242

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
654 0
|
11月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
317 1
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
381 15
|
7月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
1813 45
|
7月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
590 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
11月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
664 0
|
8月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1643 27
|
9月前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
411 17
|
9月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
702 14
|
11月前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
2426 0

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多