开发者社区> 问答> 正文

为什么窗口中的数据无法发送到下游?

代码如下:         //将json转化为LogBean       SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean());

      KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() {             @Override             public long extractAscendingTimestamp(LogBean element) {                 LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));                 long eventTime = parse.toEpochSecond(ZoneOffset.of("+8"));                 System.out.println(eventTime);                 return eventTime;             }         }).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() {             @Override             public Tuple3<String, String, Integer> map(LogBean value) throws Exception {                 //获取用户id做分组                 return new Tuple3<>(value.getNickname(), value.toString(), 1);             }         }).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {             @Override             public String getKey(Tuple3<String, String, Integer> value) throws Exception {                 return value.f0;             }         });

        WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));

        window.sum(2).print();

在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的*来自志愿者整理的flink邮件归档

展开
收起
船长的小螺号 2021-12-03 10:21:00 957 0
1 条回答
写回答
取消 提交回答
  • 建议检查下Watermark,打印出来看看是不是合法的。btw,这代码缩紧有点尴尬。*来自志愿者整理的FLINK邮件归档

    2021-12-03 10:48:37
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
探索连接的最后十秒”落时”的网关 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载