开发者社区> 问答> 正文

Apache Flink:不会触发流加入窗口

我正在尝试加入apache flink中的两个流来获得一些结果。

我的项目的当前状态是,我正在获取Twitter数据并将其映射到2元组,其中保存用户的语言和定义的时间窗口中的推文总和。我根据每种语言的推文数量和每种语言的转发数量来做这些。tweet / retweet聚合在其他进程中工作正常。

我现在想要在一个时间窗口中获得转推数量的百分比到所有推文的数量。

因此我使用以下代码:

Time windowSize = Time.seconds(15);

// Sum up tweets per language
DataStream> tweetsLangSum = tweets

    .flatMap(new TweetLangFlatMap())
    .keyBy(0)
    .timeWindow(windowSize)
    .sum(1);

// ---

// Get retweets out of all tweets per language
DataStream> retweetsLangMap = tweets

    .keyBy(new KeyByTweetPostId())
    .flatMap(new RetweetLangFlatMap());

// Sum up retweets per language
DataStream> retweetsLangSum = retweetsLangMap

    .keyBy(0)
    .timeWindow(windowSize)
    .sum(1);

// ---

tweetsLangSum.join(retweetsLangSum)

        .where(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                return tweet.f0;
            }
        })
        .equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                return tweet.f0;
            }
        })
        .window(TumblingEventTimeWindows.of(windowSize))
        .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
            @Override
            public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
                String lang = in1.f0;
                Double percentage = (double) in1.f1 / in2.f1;
                return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
            }
        })
        .print();

当我打印tweetsLangSum或retweetsLangSum输出似乎没问题。我的问题是我从来没有从连接中获得输出。有谁知道为什么?或者我在加入的第一步聚合错误时使用窗口函数?

展开
收起
flink小助手 2018-12-06 18:12:43 2423 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    这可能是由不同时间语义的混合引起的。该KeyedStream.timeWindow()方法是基于配置的时间特性创建窗口操作符的快捷方式,即,如果启用事件时间则为事件时间窗口,否则为处理时间窗口。对于连接,您明确定义事件时间窗口。

    你启用了事件处理吗?

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    2019-07-17 23:18:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像