Apache Flink:不会触发流加入窗口-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Apache Flink:不会触发流加入窗口

flink小助手 2018-12-06 18:12:43 940

我正在尝试加入apache flink中的两个流来获得一些结果。

我的项目的当前状态是,我正在获取Twitter数据并将其映射到2元组,其中保存用户的语言和定义的时间窗口中的推文总和。我根据每种语言的推文数量和每种语言的转发数量来做这些。tweet / retweet聚合在其他进程中工作正常。

我现在想要在一个时间窗口中获得转推数量的百分比到所有推文的数量。

因此我使用以下代码:

Time windowSize = Time.seconds(15);

// Sum up tweets per language
DataStream> tweetsLangSum = tweets

    .flatMap(new TweetLangFlatMap())
    .keyBy(0)
    .timeWindow(windowSize)
    .sum(1);

// ---

// Get retweets out of all tweets per language
DataStream> retweetsLangMap = tweets

    .keyBy(new KeyByTweetPostId())
    .flatMap(new RetweetLangFlatMap());

// Sum up retweets per language
DataStream> retweetsLangSum = retweetsLangMap

    .keyBy(0)
    .timeWindow(windowSize)
    .sum(1);

// ---

tweetsLangSum.join(retweetsLangSum)

        .where(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                return tweet.f0;
            }
        })
        .equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> tweet) throws Exception {
                return tweet.f0;
            }
        })
        .window(TumblingEventTimeWindows.of(windowSize))
        .apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
            @Override
            public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
                String lang = in1.f0;
                Double percentage = (double) in1.f1 / in2.f1;
                return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
            }
        })
        .print();

当我打印tweetsLangSum或retweetsLangSum输出似乎没问题。我的问题是我从来没有从连接中获得输出。有谁知道为什么?或者我在加入的第一步聚合错误时使用窗口函数?

Apache 流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:18:39

    这可能是由不同时间语义的混合引起的。该KeyedStream.timeWindow()方法是基于配置的时间特性创建窗口操作符的快捷方式,即,如果启用事件时间则为事件时间窗口,否则为处理时间窗口。对于连接,您明确定义事件时间窗口。

    你启用了事件处理吗?

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    0 0
大数据
使用钉钉扫一扫加入圈子
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题
推荐课程