EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(3); Map<String,String> map = new HashMap<>(); map.put("table.exec.source.idle-timeout","1000 ms"); executionEnvironment.getConfig().setGlobalJobParameters(ParameterTool.fromMap(map)); executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(executionEnvironment, settings); streamTableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1)); String catalogName = "cat1"; streamTableEnv.registerCatalog(catalogName,new GenericInMemoryCatalog(catalogName,"db1")); streamTableEnv.useCatalog(catalogName); streamTableEnv.executeSql("CREATE TABLE kafka_table2 (\n" + " user_id STRING,\n" + " order_amount DOUBLE,\n" + " log_ts TIMESTAMP(3),\n" + " WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n"+ ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'tp1',\n" + " 'properties.bootstrap.servers' = 'host1:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'csv'\n" + ")"); streamTableEnv.executeSql("select user_id," + "sum(order_amount) as amt," + "tumble_start(log_ts,INTERVAL '5' SECOND) as tumbleStart,"+ "tumble_end(log_ts,INTERVAL '5' SECOND) as tumbleEnd " + "from kafka_table2 group by user_id,tumble(log_ts,INTERVAL '5' SECOND)").print(); kafka 是三个分区, 测试时只向一个分区发送数据, 发现3个并行都没有生成水位线,求解?*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。