开发者社区> 问答> 正文

flinkSQL快速构建流式应用过程中数据不准确了如何解决?

     请教下,之前我看了云邪大佬的基于flinkSQL快速构建流式应用的视频,兵按照其步骤进行了复现。过程中我遇到了两个问题。      问题一是当我用先在sql-client中创建kafkasource表,再用程序向kafka发送数据,统计出来的每小时全网购买数是对的;可如果把kafka数据清空,然后先启动程序向kafka发送用户行为数据,发送了一会后再创建kafkasource表并进行业务处理,此时得到的结果是:每小时全网购买数/partition的的数量,数据不准确了。我记得kafkasource读取标记的watermark是partition中最小的那个,为什么会出现这个问题。        问题二是我在sql-client中可以创建view,可我用程序写tableEnv.sqlUpdate("create view as ...")却会报错。报错如下: Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported query: CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior,    CASE C.parent_category_id     WHEN 1 THEN '服饰鞋包'     WHEN 2 THEN '家装家饰'     WHEN 3 THEN '家电'     WHEN 4 THEN '美妆'     WHEN 5 THEN '母婴'     WHEN 6 THEN '3C数码'     WHEN 7 THEN '运动户外'     WHEN 8 THEN '食品'     ELSE '其他'   END AS category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id at org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:67) at org.apache.flink.table.planner.delegation.ParserImpl$$Lambda$213/1241938981.get(Unknown Source) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:67) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.cmb.sql_train.myexample.MyUserBehaviorExample.main(MyUserBehaviorExample.java:51)*来自志愿者整理的FLINK邮件归档

展开
收起
玛丽莲梦嘉 2021-12-03 18:31:14 1344 0
1 条回答
写回答
取消 提交回答
  • 你可以将 kafka 并行度设置成等于 kafka partition 个数。这个方式肯定能 work,且不浪费 task 资源。*来自志愿者整理的FLINK邮件归档

    2021-12-03 18:50:47
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink中的两类新型状态存储 立即下载
HareQL:快速HBase查询工具的发展过程 立即下载
实战-如何基于HBase构建图片视频数据的统一存储检索方案 立即下载