问题一:关于Watermark的使用调试问题
想问下社区,watermark必须加在数据源上吗?顺便想问下一般用什么方式来调watermark,我自己本地有发现加在map
function之后,watermark会自动重置为默认值的情况。
谢谢!*来自志愿者整理的flink邮件归档
参考回答:
可以中途产生,走这个接口
org.apache.flink.streaming.api.datastream.DataStream#assignTimestampsAndWatermarks(org.apache.flink.api.common.eventtime.WatermarkStrategy )
麻烦贴一下你加 watermark 的代码和 pipeline 看一下啥情况
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359897?spm=a2c6h.13066354.0.0.5a8f6992GOUQsh
问题二:消息队列量级特别如何优化消费?
hi, 由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!*来自志愿者整理的flink邮件归档
参考回答:
被压严重一般是sink 效率低或者计算过程有瓶颈,被压跟数据源数据多少没什么关系。你可以在web 界面查看哪个算子导致的,然后优化就可以了
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359536?spm=a2c6h.13262185.0.0.133c39c0Clkic3
问题三:Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问?
Hi 社区。
Flink 1.12.1
现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。
insert into table_a select id,udf(a),b,c from table_b;
发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
- ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
- 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗?
== Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, temp_table]], fields=[id...]) Stage 3 : Operator content : ChangelogNormalize(key=[id]) ship_strategy : HASH Stage 4 : Operator content : Calc(select=[...]) ship_strategy : FORWARD Stage 5 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.table_a], fields=[id...]) ship_strategy : FORWARD ```*来自志愿者整理的flink邮件归档
参考回答:
- 对于 upsert-kafka 会默认加上 ChangelogNormalize
- ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
forward。[1]:
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359539?spm=a2c6h.13262185.0.0.133c39c0Clkic3
问题四:状态恢复参数顺序 -s怎么办?
../bin/flink run -s path -c class test.jar 这里面的-s 必须在最前面么,我换成 ../bin/flink run -c class test.jar -s path 不生效。*来自志愿者整理的flink邮件归档
参考回答:
你放在jar包后就当作jar的参数了 ,你可以试试这样在你的main中获取参数 s 就是你的path。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359540?spm=a2c6h.13262185.0.0.133c39c0Clkic3
问题五:Flink1.12 如何使用代码提交Batch的Sql?
我们知道如果在1.12里使用Table API来提交Batch的作业,比如: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH);
但是,如果提交Sql作业的话: StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table result = tableEnv.sqlQuery(...); 文档里也找不到如何使用StreamTableEnvironment 来跑Batch的SQL,又或者使用BatchTableEnvironment?
感谢各位提供思路!*来自志愿者整理的flink邮件归档
参考回答:
Hi shougou.
你要找的是不是这个[1]
// ******************// BLINK BATCH QUERY// ******************import
org.apache.flink.table.api.EnvironmentSettings;import
org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment
bbTableEnv = TableEnvironment.create(bbSettings);
[1]
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359546?spm=a2c6h.13262185.0.0.133c39c0Clkic3