Flink报错问题之cdc任务报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.11 cdc相关问题

小白通过debezium将pgsql cdc数据同步至kafka之后,使用我们flink的sql client提交测试任务,但当kafka端cdc json数据一开始发送,任务即报错,通过web ui log查看界面发现错误日志如下,还请大佬帮忙分析,谢谢!CREATE TABLE pgsql_person_cdc(

id BIGINT,

name STRING,

age STRING,

sex STRING,

phone STRING

) WITH (

'connector' = 'kafka',

'topic' = 'postgres.public.person',

'properties.bootstrap.servers' = 'localhost:9092',

'properties.group.id' = 'pgsql_cdc',

'format' = 'debezium-json',

'debezium-json.schema-include' = 'true'

)

CREATE TABLE sql_out (

id BIGINT,

name STRING,

age STRING,

sex STRING,

phone STRING

) WITH (

'connector' = 'print'

)

INSERT INTO sql_out SELECT * FROM pgsql_person_cdc;

====================================分割线======================================

错误日志:

java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}'.

at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.0.jar:1.11.0]

at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.0.jar:1.11.0]

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.0.jar:1.11.0]

Caused by: java.lang.NullPointerException

at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:120) ~[flink-json-1.11.0.jar:1.11.0]

... 7 more

2020-07-22 17:22:34,415 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, sex, phone]) (1/1) (b553cb66df6e47a27e7dae8466b684ab).

2020-07-22 17:22:34,418 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age, sex, phone]) (1/1) b553cb66df6e47a27e7dae8466b684ab.

2020-07-22 17:22:34,461 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 495bb5a0cd877808674b29890b6b8bc0, jobId: 3feda3a191fcb8e0da891b9fda1ee532).

2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 3feda3a191fcb8e0da891b9fda1ee532 from job leader monitoring.

2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 3feda3a191fcb8e0da891b9fda1ee532.

====================================分割线======================================*来自志愿者整理的flink邮件归档



参考答案:

代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。 看起来是你的数据问题,一条 update 的changelog, before 为null, 这是不合理的,没有before的数据,是无法处理after的数据的。 如果确认是脏数据,可以开启ignore-parse-errors跳过[1]

祝好 Leonard [1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors

{ "payload": { "before": null, "after": { "id": 2, "name": "liushimin", "age": "24", "sex": "man", "phone": "155555555" }, "source": { "version": "1.2.0.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1595409754151, "snapshot": "false", "db": "postgres", "schema": "public", "table": "person", "txId": 569, "lsn": 23632344, "xmin": null }, "op": "u", "ts_ms": 1595409754270, "transaction": null } }*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371502?spm=a2c6h.12873639.article-detail.27.29d04378ApxdqJ



问题二:flink 1.11 ddl 写mysql的问题

我使用fink 1.11.1 做cdc,发现一秒钟只能写100条左右数据到mysql,请问有优化方案,或者是其他的批量写入的方案建议嘛 代码如下: String sourceDdl =" CREATE TABLE debezium_source " + "( " + "id STRING NOT NULL, name STRING, description STRING, weight Double" + ") " + "WITH (" + " 'connector' = 'kafka-0.11'," + " 'topic' = 'test0717'," + " 'properties.bootstrap.servers' = ' 172.22.20.206:9092', " + "'scan.startup.mode' = 'group-offsets','properties.group.id'='test'," + "'format' = 'debezium-json'," + "'debezium-json.schema-include'='false'," + "'debezium-json.ignore-parse-errors'='true')"; tEnv.executeSql(sourceDdl); System.out.println("init source ddl successful ==>" + sourceDdl); String sinkDdl = " CREATE TABLE sink " + "( " + "id STRING NOT NULL," + " name STRING, " + "description STRING," + " weight Double," + " PRIMARY KEY (id) NOT ENFORCED " + ")" + " WITH " + "( " + "'connector' = 'jdbc', " + "'url' = 'jdbc:mysql://127.0.0.1:3306/test?autoReconnect=true', " + "'table-name' = 'table-out', " + "'driver'= 'com.mysql.cj.jdbc.Driver'," + "'sink.buffer-flush.interval'='1s'," + "'sink.buffer-flush.max-rows'='1000'," + "'username'='DataPip', " + "'password'='DataPip')"; tEnv.executeSql(sinkDdl); System.out.println("init sink ddl successful ==>" + sinkDdl);

String dml = "INSERT INTO sink SELECT id,name ,description, weight FROM debezium_source"; System.out.println("execute dml ==>" + dml); tEnv.executeSql(dml); tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')" + "LIKE debezium_source (EXCLUDING ALL)"); tEnv.executeSql("INSERT INTO print_table SELECT id,name ,description, weight FROM debezium_source");

*来自志愿者整理的flink邮件归档



参考答案:

你观察到有sink写不过来导致反压吗? 或者你调大flush interval试试,让每个buffer攒更多的数据*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371500?spm=a2c6h.12873639.article-detail.28.29d04378ApxdqJ



问题三:flink row 类型

我这面定义row数据,类型为ROW<rule_key STRING>,可以通过

row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口

rule_key 转换为rule_key1,rulekey2

*来自志愿者整理的flink邮件归档



参考答案:

这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。

你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370331?spm=a2c6h.12873639.article-detail.29.29d04378ApxdqJ



问题四:flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊

老参数: streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'user', | 'connector.startup-mode' = 'latest-offset', | 'connector.properties.zookeeper.connect' = 'cdh1:2181,cdh2:2181,cdh3:2181', | 'connector.properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'connector.properties.group.id' = 'user_flink', | 'format.type' = 'json', | 'format.derive-schema' = 'true' | |) |""".stripMargin)

新参数:

streamTableEnv.executeSql( """ | |CREATE TABLE kafka_table ( | | uid BIGINT, | sex VARCHAR, | age INT, | created_time TIMESTAMP(3), | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND |) WITH ( | 'connector' = 'kafka', | 'topic' = 'user', | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', | 'properties.group.id' = 'user_flink', | 'scan.startup.mode' = 'latest-offset', | 'format' = 'json', | 'json.fail-on-missing-field' = 'false', | 'json.ignore-parse-errors' = 'true' |) |""".stripMargin)

*来自志愿者整理的flink邮件归档



参考答案:

你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?

正常应该不会的,可以提供个可复现代码吗?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370330?spm=a2c6h.12873639.article-detail.30.29d04378ApxdqJ



问题五:flink1.11 tablefunction

我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink

内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row)

*来自志愿者整理的flink邮件归档



参考答案:

可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370327?spm=a2c6h.12873639.article-detail.31.29d04378ApxdqJ

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
34 16
|
1月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
90 9
|
3月前
|
算法 API Apache
Flink CDC:新一代实时数据集成框架
本文源自阿里云实时计算团队 Apache Flink Committer 任庆盛在 Apache Asia CommunityOverCode 2024 的分享,涵盖 Flink CDC 的概念、版本历程、内部实现及社区未来规划。Flink CDC 是一种基于数据库日志的 CDC 技术实现的数据集成框架,能高效完成全量和增量数据的实时同步。自 2020 年以来,Flink CDC 经过多次迭代,已成为功能强大的实时数据集成工具,支持多种数据库和数据湖仓系统。未来将进一步扩展生态并提升稳定性。
690 2
Flink CDC:新一代实时数据集成框架
|
2月前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
133 4
|
3月前
|
消息中间件 canal 数据采集
Flink CDC 在货拉拉的落地与实践
陈政羽在Apache Asia Community Over Code 2024上分享了《货拉拉在Flink CDC生产实践落地》。文章介绍了货拉拉业务背景、技术选型及其在实时数据采集中的挑战与解决方案,详细阐述了Flink CDC的技术优势及在稳定性、兼容性等方面的应用成果。通过实际案例展示了Flink CDC在提升数据采集效率、降低延迟等方面的显著成效,并展望了未来发展方向。
586 14
Flink CDC 在货拉拉的落地与实践
|
2月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
124 0
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1246 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
158 56
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版