开发者社区> 问答> 正文

未生成水位线怎么处理?

EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(3); Map<String,String> map = new HashMap<>(); map.put("table.exec.source.idle-timeout","1000 ms"); executionEnvironment.getConfig().setGlobalJobParameters(ParameterTool.fromMap(map)); executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(executionEnvironment, settings); streamTableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1)); String catalogName = "cat1"; streamTableEnv.registerCatalog(catalogName,new GenericInMemoryCatalog(catalogName,"db1")); streamTableEnv.useCatalog(catalogName); streamTableEnv.executeSql("CREATE TABLE kafka_table2 (\n" + " user_id STRING,\n" + " order_amount DOUBLE,\n" + " log_ts TIMESTAMP(3),\n" + " WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n"+ ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'tp1',\n" + " 'properties.bootstrap.servers' = 'host1:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'csv'\n" + ")"); streamTableEnv.executeSql("select user_id," + "sum(order_amount) as amt," + "tumble_start(log_ts,INTERVAL '5' SECOND) as tumbleStart,"+ "tumble_end(log_ts,INTERVAL '5' SECOND) as tumbleEnd " + "from kafka_table2 group by user_id,tumble(log_ts,INTERVAL '5' SECOND)").print(); kafka 是三个分区, 测试时只向一个分区发送数据, 发现3个并行都没有生成水位线,求解?*来自志愿者整理的flink邮件归档

展开
收起
JACKJACK 2021-12-08 10:52:21 858 0
1 条回答
写回答
取消 提交回答
  • 对的是我!

    看一下 WaterMarkAssigner节点 是否有 数据流入*来自志愿者整理的flink邮件归档

    2021-12-08 16:35:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载