问题一:flink-提交jar 隔断时间自己重启怎么办?
你好 参考官网 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html 这边读取mysql jdbc数据报错Exception in thread "main" org.apache.flink.table.api.TableException: Only insert statement is supported now. String a = "-- register a MySQL table 'users' in Flink SQL\n" + "CREATE TABLE MyUserTable (\n" + " id BIGINT\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://***:3306/monitor',\n" + " 'table-name' = 't1',\n" + " 'username' = 'root',\n" + " 'password' = '***'\n" + ") ";
String b ="-- scan data from the JDBC table\n" + "SELECT id FROM MyUserTable\n";
tEnv.executeSql(a);
来自志愿者整理的flink邮件归档
参考回答:
没看懂问题。任务自动重启?失败了自然就重启了,restart策略设置的吧。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359041
问题二:flink-1.12.2 TM无法使用自定的serviceAccount访问configmap怎么办?
在 flink sql 中,可以使用 proc time 来进行 interval join,但是在 stream api 中,只能用 event time 进行 interval join,如何能使用 process time 呢?
参考回答:
您好:之前提交过一个关于这方面的issue,链接如下: http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html
目前看还是没有fix对应的issue。 报错如下:
目前看jira上的issue已经关闭了, 请确认是否修复。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359039?
问题三:flink sql count distonct 优化应该怎么办?
在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE; 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true; set table.optimizer.distinct-agg.split.bucket-num=1024; 还需要对应的将SQL改写为两段式吗? 例如: 原SQL: SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
对所需DISTINCT字段buy_id模1024自动打散后,SQL: SELECT day, SUM(cnt) total FROM ( SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, MOD(buy_id, 1024)) GROUP BY day
还是flink会帮我自动改写SQL,我不用关心?
另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
来自志愿者整理的flink邮件归档
参考回答:
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window agg支持这个参数了。可以期待下。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359042
问题四:Flink 消费kafka ,怎么写ORC文件?
【现状如下】 Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。
据了解,flink写orc的桶分配策略[1],有两种:
一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]
test/realtime/ └── 2021-03-23--07 ├── part-0-0.orc ├── part-0-1.orc └── 2021-03-23--08 ├── part-0-0.orc ├── part-0-1.orc
一种是将所有部分文件放在一个目录下:
test/realtime/ ├── part-0-0.orc ├── part-0-1.orc ├── part-0-2.orc ├── part-0-3.orc
【问题】
最终需求是想按照partition将每半个小时的orc文件load到hive,hive表dt为分区字段,值为时间戳,如:
hive> show partitions table_demo;
OK dt=1616455800000 dt=1616457600000 dt=1616459400000 dt=1616461200001 dt=1616463000001
Time taken: 0.134 seconds, Fetched: 5 row(s)
因此希望每个orc文件的所在目录名都是dt=时间戳
的格式:
http://apache-flink.147419.n8.nabble.com/file/t1162/dir.png
用flink实现这些功能后,发现这两种桶分配策略都不能实现上述需求。
不知如何实现?之前一直是自己写代码实现聚合、写orc的操作,目录文件名一切东西完全可控,现在用flink自带的功能实现,发现不太容易实现上述需求了
来自志愿者整理的flink邮件归档
参考回答:
官网有这么一段:我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner
链接: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359046?spm=a2c6h.13262185.0.0.677f6c07q66JNp
问题五:interval join 如何用 process time?
在 flink sql 中,可以使用 proc time 来进行 interval join,但是在 stream api 中,只能用 event time 进行 interval join,如何能使用 process time 呢?
来自志愿者整理的flink邮件归档
参考回答:
你好,DataStream API 中的 Interval Join 目前还不支持 process time,参考 [1]. 不过如果不要去严格准确的 process time 的话,是否可以在 Join 之前把 process time 用某个字段带出来,当 event time 用?
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359047?spm=a2c6h.13262185.0.0.677f6c07q66JNp