我正在尝试加入apache flink中的两个流来获得一些结果。
我的项目的当前状态是,我正在获取Twitter数据并将其映射到2元组,其中保存用户的语言和定义的时间窗口中的推文总和。我根据每种语言的推文数量和每种语言的转发数量来做这些。tweet / retweet聚合在其他进程中工作正常。
我现在想要在一个时间窗口中获得转推数量的百分比到所有推文的数量。
因此我使用以下代码:
Time windowSize = Time.seconds(15);
// Sum up tweets per language
DataStream> tweetsLangSum = tweets
.flatMap(new TweetLangFlatMap())
.keyBy(0)
.timeWindow(windowSize)
.sum(1);
// ---
// Get retweets out of all tweets per language
DataStream> retweetsLangMap = tweets
.keyBy(new KeyByTweetPostId())
.flatMap(new RetweetLangFlatMap());
// Sum up retweets per language
DataStream> retweetsLangSum = retweetsLangMap
.keyBy(0)
.timeWindow(windowSize)
.sum(1);
// ---
tweetsLangSum.join(retweetsLangSum)
.where(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tweet) throws Exception {
return tweet.f0;
}
})
.equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tweet) throws Exception {
return tweet.f0;
}
})
.window(TumblingEventTimeWindows.of(windowSize))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
@Override
public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
String lang = in1.f0;
Double percentage = (double) in1.f1 / in2.f1;
return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
}
})
.print();
当我打印tweetsLangSum或retweetsLangSum输出似乎没问题。我的问题是我从来没有从连接中获得输出。有谁知道为什么?或者我在加入的第一步聚合错误时使用窗口函数?
这可能是由不同时间语义的混合引起的。该KeyedStream.timeWindow()方法是基于配置的时间特性创建窗口操作符的快捷方式,即,如果启用事件时间则为事件时间窗口,否则为处理时间窗口。对于连接,您明确定义事件时间窗口。
你启用了事件处理吗?
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。