请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。 问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。 问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create view as ...")却会报错。报错如下: Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported query: CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, CASE C.parent_category_id WHEN 1 THEN '服饰鞋包' WHEN 2 THEN '家装家饰' WHEN 3 THEN '家电' WHEN 4 THEN '美妆' WHEN 5 THEN '母婴' WHEN 6 THEN '3C数码' WHEN 7 THEN '运动户外' WHEN 8 THEN '食品' ELSE '其他' END AS category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id at org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) at org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown Source) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)*来自志愿者整理的FLINK邮件归档
你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。