问题一:SQL 报错只有 flink runtime 的 NPE
我有这么一个 SQL INSERT INTO es SELECT a, udf_xxx(b) FROM mongo_oplog -- 自定义 TableFactory
Job 提交后 fail 了,从 Job 提交到 Fail 只有一处来自非业务代码的 NPE 如下,没有任何业务代码 Exception,可以稳定重现
LUE _UTF-16LE'v2'))) AS return_received_time]) (1/1) (bdf9b131f82a8ebc440165b82b89e570) switched from RUNNING to FAILED.
java.lang.NullPointerException
at StreamExecCalc 8016.split8016.split7938$(Unknown Source)
at StreamExecCalc$8016.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
请问这种怎样情况排查问题? 有任何线索都可以
*来自志愿者整理的flink邮件归档
参考答案:
这个异常一般是由于 UDF 的实现用了主类型(int),但是实际的字段值有 null 值。
你可以试试先做个 where 条件过滤,将 null 值过滤掉?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370302?spm=a2c6h.12873639.article-detail.47.29d04378ApxdqJ
问题二:sql-client] 如何绕过交互式模式去做ddl
众所周知,sql-client.sh的非交互模式下的-u是不支持ddl的,现在我们是用代码来调用sql-client.sh来做ddl,
这样在交互模式如何去做。 通过hack sql client代码可以实现,但是不改代码的情况下有没有什么最佳实践。谢谢!
*来自志愿者整理的flink邮件归档
参考答案:
你想要的是类似于 sql-client.sh -u 的功能,直接通过命令行去执行 ddl 是么?非常抱歉,目前这是不支持的。
社区的e2e测试目前也是通过 Java 代码来调用 sql-client.sh 来实现执行 ddl 的。
不过社区是有计划支持 sql-client.sh 执行一个 sql 文件的,可以关注下FLINK-12828.*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370301?spm=a2c6h.12873639.article-detail.49.6f9243783Lv0fl
问题三:flink table同时作为写出及输入时下游无数据
各位大佬好,请教一个问题,就是在flink内部定义一个表g_unit(初始为空),接受一个kafka源的写入,同时g_unit又要作为下游表g_summary的输入源,测试发现g_line表一直不会写入数据,代码如下,烦请大佬解答。
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
kafka_source_ddl = """
CREATE TABLE kafka_source_tab (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'gg',
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets'='partition:1,offset:0',
'properties.bootstrap.servers' = '****',
'format' = 'json'
)
"""
g_unit_sink_ddl = """
CREATE TABLE g_sink_unit (
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_unit',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'
)
"""
g_summary_ddl = """
CREATE TABLE g_summary_base(
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.70:3306/bdoa?useSSL=false',
'table-name' = 'g_summary',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.interval' = '1s'
)
"""
t_env.execute_sql(kafka_source_ddl)
t_env.execute_sql(g_unit_sink_ddl)
t_env.execute_sql(g_summary_ddl)
sql1='''Insert into g_unit_sink_ddl select alarm_id,trck_id from kafka_source_tab'''
sql2='''Insert into g_summary_ddl select alarm_id,trck_id from g_unit_sink_ddl'''
stmt_set = t_env.create_statement_set()
stmt_set.add_insert_sql(sql1)
stmt_set.add_insert_sql(sql2)
stmt_set.execute().get_job_client().get_job_execution_result().result()
*来自志愿者整理的flink邮件归档
参考答案:
你可以先只跑第一个insert,然后check一下g_sink_unit是否有数据。
另外,你可以把query 改为都读取 kafka_source_tab 再分别写到两个不同的sink:
sql1='''Insert into g_sink_unit select alarm_id,trck_id from kafka_source_tab''' sql2='''Insert into g_summary_base select alarm_id,trck_id from kafka_source_tab;'''*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370300?spm=a2c6h.12873639.article-detail.50.6f9243783Lv0fl
问题四:flink 1.11 on k8s native session cluster模式报找不到conf
大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
下载的flink-1.11.0-bin-scala_2.11.tgz
执行命令是
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=k8s-session-1 \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.container.image=flink:1.11.0-scala_2.11
但是会报错,找不到configmap
[image: image.png]
我看是执行上述命令是会生成configmap的,为什么还会报找不到。
*来自志愿者整理的flink邮件归档
参考答案:
configmap "flink-config-k8s-session-1" not found的报错是正常的
因为目前的实现是先创建JobManager Deployment,然后再创建ConfigMap并设置owner reference到deployment 所以你才会看到创建Pod的时候报ConfigMap还没有创建出来,这个是正常的信息,K8s会自动重试创建Pod
你现在是任务起不来吗,还是有什么其他的问题?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370299?spm=a2c6h.12873639.article-detail.51.6f9243783Lv0fl
问题五:FlinkSQL 是通FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?Flink
比如:
CREATE TABLE my_table (
id BIGINT,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector'='kafka',
'topic'='user_topic',
'properties.bootstrap.servers'='localhost:9092',
'scan.startup.mode'='earliest-offset',
'format'='debezium-json'
);
最终解析 debezium-json 应该是 flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium 下面的代码
但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
*来自志愿者整理的flink邮件归档
参考答案:
通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
[1]
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370297?spm=a2c6h.12873639.article-detail.52.6f9243783Lv0fl