开发者社区> 问答> 正文

Apache Flink:Wierd FlatMap行为

"我正在向Flink提取数据流。对于此数据的每个“实例”,我都有一个时间戳。我可以检测到我从中获取数据的机器是“生产”还是“不生产”,这是通过自定义平面地图功能来完成的,该功能位于其自己的静态类中。

我想计算机器生产/不生产的时间。我目前的方法是在两个普通列表中收集生产和非生产时间戳。对于数据的每个“实例”,我通过从最早的时间戳中减去最新的时间戳来计算当前的生产/非生产持续时间。不过,这给了我不正确的结果。当生产状态从生产变为非生产时,我清除生产的时间戳列表,反之亦然,这样如果生产再次开始,则持续时间从零开始。

我查看了两个列表,我收集了相应的时间戳,我看到了我不明白的事情。我的假设是,只要机器“生成”,生产时间戳列表中的第一个时间戳保持不变,而每个新数据实例将新时间戳添加到列表中。显然,这个假设是错误的,因为我在列表中看似似乎是随机的时间戳。不过,它们仍然是正确的。

这是我的flatmap函数的代码:

public static class ImaginePaperDataConverterRich extends RichFlatMapFunction {

private static final long serialVersionUID = 4736981447434827392L;
private transient ValueState<ProductionState> stateOfProduction;
SimpleDateFormat dateFormat = new SimpleDateFormat(""dd.MM.yyyy HH:mm:ss.SS"");
DateFormat timeDiffFormat = new SimpleDateFormat(""dd HH:mm:ss.SS"");
String timeDiffString = ""00 00:00:00.000"";
List<String> productionTimestamps = new ArrayList<>();
List<String> nonProductionTimestamps = new ArrayList<>();

public String calcProductionTime(List<String> timestamps) {
    if (!timestamps.isEmpty()) {
        try {
            Date firstDate = dateFormat.parse(timestamps.get(0));
            Date lastDate = dateFormat.parse(timestamps.get(timestamps.size()-1));
            long timeDiff = lastDate.getTime() - firstDate.getTime();

            if (timeDiff < 0) {
                System.out.println(""Something weird happened. Maybe EOF."");
                return timeDiffString;
            }

            timeDiffString = String.format(""%02d %02d:%02d:%02d.%02d"",
                TimeUnit.MILLISECONDS.toDays(timeDiff),
                TimeUnit.MILLISECONDS.toHours(timeDiff)   % TimeUnit.HOURS.toHours(1),
                TimeUnit.MILLISECONDS.toMinutes(timeDiff) % TimeUnit.HOURS.toMinutes(1),
                TimeUnit.MILLISECONDS.toSeconds(timeDiff) % TimeUnit.MINUTES.toSeconds(1),
                TimeUnit.MILLISECONDS.toMillis(timeDiff)  % TimeUnit.SECONDS.toMillis(1));

        } catch (ParseException e) {
            e.printStackTrace();
        }
        System.out.println(""State duration: "" + timeDiffString);
    }
    return timeDiffString;
}

@Override
public void open(Configuration config) {
    ValueStateDescriptor<ProductionState> descriptor = new ValueStateDescriptor<>(
        ""stateOfProduction"",
        TypeInformation.of(new TypeHint<ProductionState>() {}),
        ProductionState.NOT_PRODUCING);
        stateOfProduction = getRuntimeContext().getState(descriptor);
}

@Override
public void flatMap(ImaginePaperData ImaginePaperData, Collector<String> output) throws Exception {
    List<String> warnings = new ArrayList<>();
    JSONObject jObject = new JSONObject();
    String productionTime = ""0"";
    String nonProductionTime = ""0"";

    // Data analysis
    if (stateOfProduction == null || stateOfProduction.value() == ProductionState.NOT_PRODUCING && ImaginePaperData.actSpeedCl > 60.0) {
        stateOfProduction.update(ProductionState.PRODUCING);
    } else if (stateOfProduction.value() == ProductionState.PRODUCING && ImaginePaperData.actSpeedCl < 60.0) {
        stateOfProduction.update(ProductionState.NOT_PRODUCING);
    }

    if(stateOfProduction.value() == ProductionState.PRODUCING) {
        if (!nonProductionTimestamps.isEmpty()) {
            System.out.println(""Production has started again, non production timestamps cleared"");
            nonProductionTimestamps.clear();
        }
        productionTimestamps.add(ImaginePaperData.timestamp);

        System.out.println(productionTimestamps);
        productionTime = calcProductionTime(productionTimestamps);
    } else {
        if(!productionTimestamps.isEmpty()) {
            System.out.println(""Production has stopped, production timestamps cleared"");
            productionTimestamps.clear();
        }
        nonProductionTimestamps.add(ImaginePaperData.timestamp);
        warnings.add(""Production has stopped."");

        System.out.println(nonProductionTimestamps);
        //System.out.println(""Production stopped"");
        nonProductionTime = calcProductionTime(nonProductionTimestamps);
    }

// The rest is just JSON stuff
我是否可能必须在ListState中保存这两个时间戳列表?

编辑:因为另一个用户问,这是我得到的数据。

{'szenario': 'machine01', 'timestamp': '31.10.2018 09:18:39.432069', 'data': {1: 100.0, 2: 100.0, 101: 94.0, 102: 120.0, 103: 65.0}}
我期望的行为是我的flink程序收集两个列表productionTimestamps和nonProductionTimestamps中的时间戳。然后我希望我的calcProductionTime方法从第一个时间戳中减去列表中的最后一个时间戳,以获得我第一次检测到机器“生成”/“未生成”和停止“生成”/“的时间之间的持续时间。不生产”。"

展开
收起
flink小助手 2018-11-28 16:14:39 3208 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "我发现“看似随机”时间戳的原因是Apache Flink的并行执行。当并行度设置为> 1时,不再保证事件的顺序。

    我的快速解决方法是将程序的并行度设置为1,这可以保证事件的顺序,据我所知。"

    2019-07-17 23:16:50
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Flink技术进阶 立即下载
Apache Spark: Cloud and On-Prem 立即下载
Hybrid Cloud and Apache Spark 立即下载

相关镜像