问题一:flink state问题
大家好
我有一个去重的需求,想节省内存用的bloomfilter,代码如下:
.keyBy(_._1).process(new KeyedProcessFunctionString,(String,String),String {
var state:ValueState[BloomFilter[CharSequence]]= null
override def open(parameters: Configuration): Unit = {
val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHintBloomFilter[CharSequence]{}))
state = getRuntimeContext.getState(stateDesc)
}
override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = {
var filter = state.value
if(filter==null){
println("null filter")
filter= BloomFilter.createCharSequence}
//val contains = filter.mightContain(value._2)
if(!filter.mightContain(value._2)) {
filter.put(value._2)
state.update(filter)
out.collect(value._2)
}
}
})
通过日志我看到每次我从savepoint恢复的时候这个state里面的bloomfilter都是null,这是为什么啊
参考回答:
你可以尝试用 state-process-api[1] 看一下 savepoint 中 state 的内容,先缩小一下问题的范围,如果
savepoint 中就没有了,那就是序列化到 savepoint 的时候出错了,savepoitn 是有的,那么就是恢复的时候出错了。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372081
问题二:[sql-client] 通过sql-client提交sql怎么设置checkpointing.in
hi flink users
通过sql-client提交sql怎么设置checkpointing.interval?
我看了一下sql-client-defaults.yaml中的execution, 并没有发现这个参数。请教大家一下。
参考回答:
现在还不支持在sql-client-defaults.yaml 里配置 checkpointing.interval,
你可以配置在flink-conf.yaml里
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372085
问题三:flink1.11 set yarn slots failed
使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink run-application -t yarn-application
-Djobmanager.memory.process.size=1024m
-Dtaskmanager.memory.process.size=2048m
-ys 4 \
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task Slots的值,现在每次提交作业都是0
参考回答:
-t是新引入的参数,是不支持以前的-yxxx参数的
你需要使用-Dtaskmanager.numberOfTaskSlots=4这样来设置
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372090
问题四:Flink-1.11内置connector测试问题求解
小白在测试flink 1.11新特性新内置的三个connector时,在本地创建图片[1]中的任务并进行数据打印时,控制台只打印了表schema,而没有按内置的datagen connector规则产生数据,请问可能是什么原因呢?谢谢解答!
[1] https://postimg.cc/PprT9XV6
参考回答:
tableResult.print需要有checkpoint
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372096
问题五:flink 1.11 upsert结果出错
各位大佬好,请教一个问题flink从Kafka读数,写入mysql,对mysql结果根据主键进行数据更新,看官网是支持“on DUPLICATE”的,但是在执行中报错是这个导致的语法问题。完整代码如下,是在linux下,直接python *.py执行的。请问下这个是不支持吗,还是怎么写呢!
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
trck_id VARCHAR,
score INT
) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json'
)
"""
sink="""
CREATE TABLE g_source_tab (
trck_id VARCHAR,
score INT,
PRIMARY KEY (trck_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g',
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source)
t_env.execute_sql(sink)
table_result1=t_env.execute_sql('''Insert into g_source_tab (trck_id
,score
) VALUES (select
trck_id,score from kafka_source_tab ) ON DUPLICATE KEY UPDATE score=score+1''')
table_result1.get_job_client().get_job_execution_result().result()
参考回答:
这个语法 Flink 还不支持的,官网上说的 Flink 的 JDBC connector 实现 幂等写入[1]的方式,就是有相同pk的数据在写入数据库时,翻译成数据库 upsert SQL的方式,这里说的语法是数据库的 SQL 语法 。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#idempotent-writes
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372099