代码如下: //将json转化为LogBean SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean());
KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() { @Override public long extractAscendingTimestamp(LogBean element) { LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); long eventTime = parse.toEpochSecond(ZoneOffset.of("+8")); System.out.println(eventTime); return eventTime; } }).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(LogBean value) throws Exception { //获取用户id做分组 return new Tuple3<>(value.getNickname(), value.toString(), 1); } }).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } });
WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
window.sum(2).print();
在用sum前用的是reduce,在reduce可以打印出数据,但是reduce之后的结果数据始终没有,打印写文件都是空的*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。