Flink报错问题之SQL报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:SQL 报错只有 flink runtime 的 NPE

我有这么一个 SQL INSERT INTO es SELECT a, udf_xxx(b) FROM mongo_oplog -- 自定义 TableFactory

Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现

LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1) (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.

java.lang.NullPointerException

at StreamExecCalc 8016.split8016.split7938$(Unknown Source)

at StreamExecCalc$8016.processElement(Unknown Source)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

请问这种怎样情况排查问题? 有任何线索都可以

*来自志愿者整理的flink邮件归档



参考答案:

这个异常一般是由于 UDF 的实现用了主类型(int),但是实际的字段值有 null 值。

你可以试试先做个 where 条件过滤,将 null 值过滤掉?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370302?spm=a2c6h.12873639.article-detail.47.29d04378ApxdqJ



问题二:sql-client] 如何绕过交互式模式去做ddl

众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,

这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。

社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。

不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370301?spm=a2c6h.12873639.article-detail.49.6f9243783Lv0fl



问题三:flink table同时作为写出及输入时下游无数据

各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

kafka_source_ddl = """

CREATE TABLE kafka_source_tab (

id VARCHAR,    

alarm_id VARCHAR,    

trck_id VARCHAR

) WITH (

'connector' = 'kafka',

'topic' = 'gg',    

'scan.startup.mode' = 'specific-offsets',  

'scan.startup.specific-offsets'='partition:1,offset:0',

'properties.bootstrap.servers' = '****',

'format' = 'json'

)

"""

g_unit_sink_ddl = """

CREATE TABLE g_sink_unit (

alarm_id VARCHAR,    

trck_id VARCHAR

 

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',

'table-name' = 'g_unit',    

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.interval' = '1s'    

)

"""

g_summary_ddl = """

CREATE TABLE g_summary_base(

alarm_id VARCHAR,    

trck_id VARCHAR

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',

'table-name' = 'g_summary',  

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.interval' = '1s'

)

"""

t_env.execute_sql(kafka_source_ddl)

t_env.execute_sql(g_unit_sink_ddl)

t_env.execute_sql(g_summary_ddl)

sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from kafka_source_tab'''

sql2='''Insert into g_summary_ddl select alarm_id,trck_id from g_unit_sink_ddl'''

stmt_set = t_env.create_statement_set()

stmt_set.add_insert_sql(sql1)

stmt_set.add_insert_sql(sql2)

stmt_set.execute().get_job_client().get_job_execution_result().result()

*来自志愿者整理的flink邮件归档



参考答案:

你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。

另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:

sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from kafka_source_tab;'''*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370300?spm=a2c6h.12873639.article-detail.50.6f9243783Lv0fl



问题四:flink 1.11 on k8s native session cluster模式报找不到conf

大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。

下载的flink-1.11.0-bin-scala_2.11.tgz

执行命令是

./bin/kubernetes-session.sh \

-Dkubernetes.cluster-id=k8s-session-1 \

-Dtaskmanager.memory.process.size=4096m \

-Dkubernetes.taskmanager.cpu=2 \

-Dtaskmanager.numberOfTaskSlots=4 \

-Dresourcemanager.taskmanager-timeout=3600000 \

-Dkubernetes.container.image=flink:1.11.0-scala_2.11

但是会报错,找不到configmap

[image: image.png]

我看是执行上述命令是会生成configmap的,为什么还会报找不到。

*来自志愿者整理的flink邮件归档



参考答案:

configmap "flink-config-k8s-session-1" not found的报错是正常的

因为目前的实现是先创建JobManager Deployment,然后再创建ConfigMap并设置owner reference到deployment 所以你才会看到创建Pod的时候报ConfigMap还没有创建出来,这个是正常的信息,K8s会自动重试创建Pod

你现在是任务起不来吗,还是有什么其他的问题?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370299?spm=a2c6h.12873639.article-detail.51.6f9243783Lv0fl



问题五:FlinkSQL 是通FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?Flink

比如:

CREATE TABLE my_table (

id BIGINT,

first_name STRING,

last_name STRING,

email STRING

) WITH (

'connector'='kafka',

'topic'='user_topic',

'properties.bootstrap.servers'='localhost:9092',

'scan.startup.mode'='earliest-offset',

'format'='debezium-json'

);

最终解析 debezium-json 应该是 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 下面的代码

但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?

*来自志愿者整理的flink邮件归档



参考答案:

通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370297?spm=a2c6h.12873639.article-detail.52.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
Oracle 关系型数据库 MySQL
flink cdc 插件问题之报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
20天前
|
SQL JSON Kubernetes
Seata常见问题之服务端 error日志没有输出,客户端执行sql报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
94 0
|
21天前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
32 2
|
25天前
|
Java 关系型数据库 MySQL
Flink CDC有见这个报错不?
【2月更文挑战第29天】Flink CDC有见这个报错不?
17 2
|
28天前
|
存储 关系型数据库 MySQL
Flink CDC产品常见问题之写hudi的时候报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
28天前
|
资源调度 关系型数据库 测试技术
Flink CDC产品常见问题之没有报错但是一直监听不到数据如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
28天前
|
缓存 监控 Java
Flink CDC产品常见问题之flink集群jps命令报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
SQL 数据库 索引
解决SQL报错:索引中丢失IN或OUT參数
解决SQL报错:索引中丢失IN或OUT參数
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之用superset连接starrocks报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
Oracle 关系型数据库 MySQL
flink cdc 增量问题之增量数据会报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

相关产品

  • 实时计算 Flink版