streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
.assignTimestampsAndWatermarks(new CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
.connect(ruleConfigSource) .process(new MetricDataFilterProcessFunction()) .keyBy((KeySelector<Metric, MetricDataKey>) metric -> { MetricDataKey metricDataKey = new MetricDataKey(); metricDataKey.setDomain(metric.getDomain()); metricDataKey.setStationAliasCode(metric.getStaId()); metricDataKey.setEquipMK(metric.getEquipMK()); metricDataKey.setEquipID(metric.getEquipID()); metricDataKey.setMetric(metric.getMetric()); return metricDataKey; })
.window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1))) .apply(new RichWindowFunction<Metric, MetricDataList, MetricDataKey, TimeWindow>() { @Override public void apply(MetricDataKey tuple, TimeWindow window, Iterable input, Collector out) throws Exception { input.forEach(x->{ System.out.println("--->>>"+x); }); } })
我定义这个Topology中能正常执行keyBy,但是无法执行apply中的 System.out.println("--->>>"+x); *来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。