Flink报错问题之SQL报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:SQL 报错只有 flink runtime 的 NPE

我有这么一个 SQL INSERT INTO es SELECT a, udf_xxx(b) FROM mongo_oplog -- 自定义 TableFactory

Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现

LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1) (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.

java.lang.NullPointerException

at StreamExecCalc 8016.split8016.split7938$(Unknown Source)

at StreamExecCalc$8016.processElement(Unknown Source)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

请问这种怎样情况排查问题? 有任何线索都可以

*来自志愿者整理的flink邮件归档



参考答案:

这个异常一般是由于 UDF 的实现用了主类型(int),但是实际的字段值有 null 值。

你可以试试先做个 where 条件过滤,将 null 值过滤掉?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370302?spm=a2c6h.12873639.article-detail.47.29d04378ApxdqJ



问题二:sql-client] 如何绕过交互式模式去做ddl

众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,

这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。

社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。

不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370301?spm=a2c6h.12873639.article-detail.49.6f9243783Lv0fl



问题三:flink table同时作为写出及输入时下游无数据

各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

kafka_source_ddl = """

CREATE TABLE kafka_source_tab (

id VARCHAR,    

alarm_id VARCHAR,    

trck_id VARCHAR

) WITH (

'connector' = 'kafka',

'topic' = 'gg',    

'scan.startup.mode' = 'specific-offsets',  

'scan.startup.specific-offsets'='partition:1,offset:0',

'properties.bootstrap.servers' = '****',

'format' = 'json'

)

"""

g_unit_sink_ddl = """

CREATE TABLE g_sink_unit (

alarm_id VARCHAR,    

trck_id VARCHAR

 

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',

'table-name' = 'g_unit',    

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.interval' = '1s'    

)

"""

g_summary_ddl = """

CREATE TABLE g_summary_base(

alarm_id VARCHAR,    

trck_id VARCHAR

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',

'table-name' = 'g_summary',  

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.interval' = '1s'

)

"""

t_env.execute_sql(kafka_source_ddl)

t_env.execute_sql(g_unit_sink_ddl)

t_env.execute_sql(g_summary_ddl)

sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from kafka_source_tab'''

sql2='''Insert into g_summary_ddl select alarm_id,trck_id from g_unit_sink_ddl'''

stmt_set = t_env.create_statement_set()

stmt_set.add_insert_sql(sql1)

stmt_set.add_insert_sql(sql2)

stmt_set.execute().get_job_client().get_job_execution_result().result()

*来自志愿者整理的flink邮件归档



参考答案:

你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。

另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:

sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from kafka_source_tab;'''*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370300?spm=a2c6h.12873639.article-detail.50.6f9243783Lv0fl



问题四:flink 1.11 on k8s native session cluster模式报找不到conf

大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。

下载的flink-1.11.0-bin-scala_2.11.tgz

执行命令是

./bin/kubernetes-session.sh \

-Dkubernetes.cluster-id=k8s-session-1 \

-Dtaskmanager.memory.process.size=4096m \

-Dkubernetes.taskmanager.cpu=2 \

-Dtaskmanager.numberOfTaskSlots=4 \

-Dresourcemanager.taskmanager-timeout=3600000 \

-Dkubernetes.container.image=flink:1.11.0-scala_2.11

但是会报错,找不到configmap

[image: image.png]

我看是执行上述命令是会生成configmap的,为什么还会报找不到。

*来自志愿者整理的flink邮件归档



参考答案:

configmap "flink-config-k8s-session-1" not found的报错是正常的

因为目前的实现是先创建JobManager Deployment,然后再创建ConfigMap并设置owner reference到deployment 所以你才会看到创建Pod的时候报ConfigMap还没有创建出来,这个是正常的信息,K8s会自动重试创建Pod

你现在是任务起不来吗,还是有什么其他的问题?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370299?spm=a2c6h.12873639.article-detail.51.6f9243783Lv0fl



问题五:FlinkSQL 是通FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?Flink

比如:

CREATE TABLE my_table (

id BIGINT,

first_name STRING,

last_name STRING,

email STRING

) WITH (

'connector'='kafka',

'topic'='user_topic',

'properties.bootstrap.servers'='localhost:9092',

'scan.startup.mode'='earliest-offset',

'format'='debezium-json'

);

最终解析 debezium-json 应该是 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 下面的代码

但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?

*来自志愿者整理的flink邮件归档



参考答案:

通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370297?spm=a2c6h.12873639.article-detail.52.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
14天前
|
SQL 关系型数据库 MySQL
这样的SQL执行为什么不会报错?optimizer_trace深度历险
【10月更文挑战第12天】本文探讨了一条看似错误但实际上能成功执行的SQL语句,通过开启MySQL的优化器追踪功能,详细分析了SQL的执行过程,揭示了子查询被优化器解析为连接操作的原因,最终解释了为何该SQL不会报错。文章不仅增进了对SQL优化机制的理解,也展示了如何利用优化器追踪解决实际问题。
|
2月前
|
SQL 数据库
数据库数据恢复—SQL Server数据库报错“错误823”的数据恢复案例
SQL Server附加数据库出现错误823,附加数据库失败。数据库没有备份,无法通过备份恢复数据库。 SQL Server数据库出现823错误的可能原因有:数据库物理页面损坏、数据库物理页面校验值损坏导致无法识别该页面、断电或者文件系统问题导致页面丢失。
100 12
数据库数据恢复—SQL Server数据库报错“错误823”的数据恢复案例
|
2月前
|
SQL 数据库
SQL解析相关报错
SQL解析相关报错
44 5
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
152 15
|
29天前
|
SQL 关系型数据库 MySQL
|
2月前
|
关系型数据库 MySQL Nacos
nacos启动报错 load derby-schema.sql error
这篇文章描述了作者在使用Nacos时遇到的启动错误,错误提示为加载derby-schema.sql失败,作者通过将数据库从Derby更换为MySQL解决了问题。
nacos启动报错 load derby-schema.sql error
|
2月前
|
关系型数据库 MySQL Java
flywa报错java.sql.SQLSyntaxErrorException: Unknown database ‘flyway‘
flywa报错java.sql.SQLSyntaxErrorException: Unknown database ‘flyway‘
33 1
|
25天前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
40 0
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
4月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
98 13

相关产品

  • 实时计算 Flink版