Flink报错问题之SQL报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:SQL 报错只有 flink runtime 的 NPE

我有这么一个 SQL INSERT INTO es SELECT a, udf_xxx(b) FROM mongo_oplog -- 自定义 TableFactory

Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现

LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1) (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.

java.lang.NullPointerException

at StreamExecCalc 8016.split8016.split7938$(Unknown Source)

at StreamExecCalc$8016.processElement(Unknown Source)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)

at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)

at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)

at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)

at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)

at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

请问这种怎样情况排查问题? 有任何线索都可以

*来自志愿者整理的flink邮件归档



参考答案:

这个异常一般是由于 UDF 的实现用了主类型(int),但是实际的字段值有 null 值。

你可以试试先做个 where 条件过滤,将 null 值过滤掉?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370302?spm=a2c6h.12873639.article-detail.47.29d04378ApxdqJ



问题二:sql-client] 如何绕过交互式模式去做ddl

众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,

这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!

*来自志愿者整理的flink邮件归档



参考答案:

你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。

社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。

不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370301?spm=a2c6h.12873639.article-detail.49.6f9243783Lv0fl



问题三:flink table同时作为写出及输入时下游无数据

各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()

env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

env.set_parallelism(1)

env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()

t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

kafka_source_ddl = """

CREATE TABLE kafka_source_tab (

id VARCHAR,    

alarm_id VARCHAR,    

trck_id VARCHAR

) WITH (

'connector' = 'kafka',

'topic' = 'gg',    

'scan.startup.mode' = 'specific-offsets',  

'scan.startup.specific-offsets'='partition:1,offset:0',

'properties.bootstrap.servers' = '****',

'format' = 'json'

)

"""

g_unit_sink_ddl = """

CREATE TABLE g_sink_unit (

alarm_id VARCHAR,    

trck_id VARCHAR

 

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',

'table-name' = 'g_unit',    

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.interval' = '1s'    

)

"""

g_summary_ddl = """

CREATE TABLE g_summary_base(

alarm_id VARCHAR,    

trck_id VARCHAR

) WITH (

'connector' = 'jdbc',

'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',

'table-name' = 'g_summary',  

'username' = 'root',

'password' = 'root',

'sink.buffer-flush.interval' = '1s'

)

"""

t_env.execute_sql(kafka_source_ddl)

t_env.execute_sql(g_unit_sink_ddl)

t_env.execute_sql(g_summary_ddl)

sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from kafka_source_tab'''

sql2='''Insert into g_summary_ddl select alarm_id,trck_id from g_unit_sink_ddl'''

stmt_set = t_env.create_statement_set()

stmt_set.add_insert_sql(sql1)

stmt_set.add_insert_sql(sql2)

stmt_set.execute().get_job_client().get_job_execution_result().result()

*来自志愿者整理的flink邮件归档



参考答案:

你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。

另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:

sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from kafka_source_tab;'''*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370300?spm=a2c6h.12873639.article-detail.50.6f9243783Lv0fl



问题四:flink 1.11 on k8s native session cluster模式报找不到conf

大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。

下载的flink-1.11.0-bin-scala_2.11.tgz

执行命令是

./bin/kubernetes-session.sh \

-Dkubernetes.cluster-id=k8s-session-1 \

-Dtaskmanager.memory.process.size=4096m \

-Dkubernetes.taskmanager.cpu=2 \

-Dtaskmanager.numberOfTaskSlots=4 \

-Dresourcemanager.taskmanager-timeout=3600000 \

-Dkubernetes.container.image=flink:1.11.0-scala_2.11

但是会报错,找不到configmap

[image: image.png]

我看是执行上述命令是会生成configmap的,为什么还会报找不到。

*来自志愿者整理的flink邮件归档



参考答案:

configmap "flink-config-k8s-session-1" not found的报错是正常的

因为目前的实现是先创建JobManager Deployment,然后再创建ConfigMap并设置owner reference到deployment 所以你才会看到创建Pod的时候报ConfigMap还没有创建出来,这个是正常的信息,K8s会自动重试创建Pod

你现在是任务起不来吗,还是有什么其他的问题?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370299?spm=a2c6h.12873639.article-detail.51.6f9243783Lv0fl



问题五:FlinkSQL 是通FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?Flink

比如:

CREATE TABLE my_table (

id BIGINT,

first_name STRING,

last_name STRING,

email STRING

) WITH (

'connector'='kafka',

'topic'='user_topic',

'properties.bootstrap.servers'='localhost:9092',

'scan.startup.mode'='earliest-offset',

'format'='debezium-json'

);

最终解析 debezium-json 应该是 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 下面的代码

但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?

*来自志愿者整理的flink邮件归档



参考答案:

通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370297?spm=a2c6h.12873639.article-detail.52.6f9243783Lv0fl

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
SQL Java 数据库连接
【YashanDB知识库】解决mybatis的mapper文件sql语句结尾加分号";"报错
【YashanDB知识库】解决mybatis的mapper文件sql语句结尾加分号";"报错
|
6月前
|
SQL
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
【YashanDB知识库】使用leading hint调整SQL执行计划后报错YAS-04522 invalid hint leading
|
6月前
|
SQL Java 数据库连接
【YashanDB 知识库】解决 mybatis 的 mapper 文件 sql 语句结尾加分号";"报错
【YashanDB 知识库】解决 mybatis 的 mapper 文件 sql 语句结尾加分号";"报错
|
7月前
|
SQL 数据库
数据库数据恢复—SQL Server报错“错误 823”的数据恢复案例
SQL Server数据库附加数据库过程中比较常见的报错是“错误 823”,附加数据库失败。 如果数据库有备份则只需还原备份即可。但是如果没有备份,备份时间太久,或者其他原因导致备份不可用,那么就需要通过专业手段对数据库进行数据恢复。
|
7月前
|
SQL
【YashanDB 知识库】使用 leading hint 调整 SQL 执行计划后报错 YAS-04522 invalid hint leading
在 YashanDB 的所有版本中,使用 leading hint 调整 SQL 执行计划时可能出现“YAS-04522 invalid hint leading”错误,导致 SQL 无法正常执行。原因是 YashanDB 优化器的 Bug。解决方法为避免使用 leading hint。可通过创建测试表 a、b、c 并执行特定 SQL 语句来验证问题是否存在。
|
8月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1643 27
|
9月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
702 14
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
434 13
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。
275 9

相关产品

  • 实时计算 Flink版