问题一:[sql-client] 通过sql-client提交sql怎么设置checkpointing.in
通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
谢谢
*来自志愿者整理的flink邮件归档
参考答案:
现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
你可以配置在flink-conf.yaml里*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370216?spm=a2c6h.12873639.article-detail.73.6f9243783Lv0fl
问题二:flink1.11 set yarn slots failed
使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink run-application -t yarn-application
-Djobmanager.memory.process.size=1024m
-Dtaskmanager.memory.process.size=2048m
-ys 4 \
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task Slots的值,现在每次提交作业都是0
*来自志愿者整理的flink邮件归档
参考答案:
’-t是新引入的参数,是不支持以前的-yxxx参数的
你需要使用-Dtaskmanager.numberOfTaskSlots=4这样来设置*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370215?spm=a2c6h.12873639.article-detail.74.6f9243783Lv0fl
问题三:Flink-1.11内置connector测试问题求解
小白在测试flink 1.11新特性新内置的三个connector时,在本地创建图片[1]中的任务并进行数据打印时,控制台只打印了表schema,而没有按内置的datagen connector规则产生数据,请问可能是什么原因呢?谢谢解答!
[1] https://postimg.cc/PprT9XV6
*来自志愿者整理的flink邮件归档
参考答案:
目前 1.11 版本中的 tableResult.print 只支持 exactly once 语义,需要配置 checkpoint。
1.12 里准备支持 at least once 语义,用户可以不用配置 checkpoint。目前 pr [1] 正在reivew 。
[1] https://github.com/apache/flink/pull/12867*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370213?spm=a2c6h.12873639.article-detail.75.6f9243783Lv0fl
问题四:flink 1.11 upsert结果出错
各位大佬好,请教一个问题flink从Kafka读数,写入mysql,对mysql结果根据主键进行数据更新,看官网是支持“on DUPLICATE”的,但是在执行中报错是这个导致的语法问题。完整代码如下,是在linux下,直接python *.py执行的。请问下这个是不支持吗,还是怎么写呢!
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
trck_id VARCHAR,
score INT
) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json'
)
"""
sink="""
CREATE TABLE g_source_tab (
trck_id VARCHAR,
score INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g',
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source)
t_env.execute_sql(sink)
table_result1=t_env.execute_sql('''Insert into g_source_tab (trck_id
,score
) VALUES (select
trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE score=score+1''')
table_result1.get_job_client().get_job_execution_result().result()
*来自志愿者整理的flink邮件归档
参考答案:
这个语法 Flink 还不支持的,官网上说的 Flink 的 JDBC connector 实现 幂等写入[1]的方式,就是有相同pk的数据在写入数据库时,翻译成数据库 upsert SQL的方式,这里说的语法是数据库的 SQL 语法 。
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370209?spm=a2c6h.12873639.article-detail.76.6f9243783Lv0fl
问题五:flink1.11 sql kafka 抽取事件时间
大家好!
使用flink1.11 sql接入kafka ,format为csv
从eventTime字段中抽取事件时间
rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000, 'yyyy-MM-dd HH:mm:ss'))
eventTime可能存在脏数据(非13位的毫秒时间戳),设置了 'csv.ignore-parse-errors' = 'true', 那么eventTime会被设置为null,此时会报一个异常:
Caused by: java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value.
有没有什么好的方式可以解决
*来自志愿者整理的flink邮件归档
参考答案:
我感觉可以通过计算列的方式来解决呀,你只需要在计算rowtime这个列的时候保证它不是null即可,如果是null,可以设置一个默认值之类的?*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370212?spm=a2c6h.12873639.article-detail.77.6f9243783Lv0fl