Flink-CEP之NFA

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: NFAb 模型包含两个阶段:第一个阶段是模式匹配阶段,在这个阶段它将会向最终态过渡并随着事件被选择而扩展缓冲区;第二个阶段是匹配提取阶段,该阶段发生在超时或者到达最终态时,将会从缓冲区中检索所产生的匹配。
NFAb
模型包含两个阶段:第一个阶段是模式匹配阶段,在这个阶段它将会向最终态过渡并随着事件被选择而扩展缓冲区;第二个阶段是匹配提取阶段,该阶段发生在超时或者到达最终态时,将会从缓冲区中检索所产生的匹配。

首先我们先来看在NFA中如何保存之前“计算”过的状态。这是什么意思呢?NFA作为一个建立在流处理之上的状态机,它以流中的事件作为输入并驱动状态的转换。然而,状态的转换并不是由当前被处理的事件独立决定的,它可能还依赖于之前已经计算过的事件的状态信息。Flink定义了类ComputationState来封装计算过的事件的状态,它包含如下这些属性:

NFA以一个队列保存了其状态机中当前所有的ComputationState。作为一个非确定性有限自动机的实现,NFA以process方法来接收流中的每个事件,进行处理后返回匹配的以及超时的事件序列。

方法最初首先遍历残留在队列中的ComputationState,它们通常是NFA处理之前的事件所建立的:

for (int i = 0; i < numberComputationStates; i++) {
    //队列中出队一个ComputationState
    ComputationState<T> computationState = computationStates.poll();

    final Collection<ComputationState<T>> newComputationStates;

    //如果当前遍历的ComputationState不是起始状态,同时窗口时间大于零
    //且待处理事件的时间戳跟当前遍历状态的时间戳之差大于窗口时间(说明当前ComputationState已滞后)
    if (!computationState.isStartState() &&
        windowTime > 0 &&
        timestamp - computationState.getStartTimestamp() >= windowTime) {

        //如果NFA被指定处理超时,则根据当前的ComputationState对象提取超时的匹配模式的事件序列,
        //并将超时的匹配模式的事件序列加入到timeoutResult用于最后的返回
        if (handleTimeout) {
            Collection<Map<String, T>> timeoutPatterns = extractPatternMatches(computationState);

            for (Map<String, T> timeoutPattern : timeoutPatterns) {
                timeoutResult.add(Tuple2.of(timeoutPattern, timestamp));
            }
        }

        //从共享缓冲区中移除已经超出窗口长度的ComputationState对应的事件信息
        sharedBuffer.release(computationState.getState(), computationState.getEvent(), 
            computationState.getTimestamp());
        sharedBuffer.remove(computationState.getState(), computationState.getEvent(), 
            computationState.getTimestamp());

        //不产生新的ComputationState
        newComputationStates = Collections.emptyList();
    } else {
        //基于当前的ComputationState以及待处理的事件计算新的ComputationState集合
        newComputationStates = computeNextStates(computationState, event, timestamp);
    }

    //遍历新产生的ComputationState
    for (ComputationState<T> newComputationState: newComputationStates) {
        //如果新的ComputationState到达最终态,则提取出其对应的事件匹配,并加入到待返回的结果集中
        if (newComputationState.isFinalState()) {
            Collection<Map<String, T>> matches = extractPatternMatches(newComputationState);
            result.addAll(matches);

            //移除已匹配的模式因为它们不再需要了
            sharedBuffer.release(newComputationState.getState(), newComputationState.getEvent(), 
                newComputationState.getTimestamp());
            sharedBuffer.remove(newComputationState.getState(), newComputationState.getEvent(), 
                newComputationState.getTimestamp());
        } else {
            //否则将新的ComputationState加入队列待处理下一个事件时处理
            computationStates.add(newComputationState);
        }
    }
}

接下来还有一步是基于窗口长度删除共享缓冲区内的元素超出窗口长度的元素。

以上这个process方法时NFA处理事件的主干部分,本篇开头提到的两个阶段的逻辑实现处于额外的两个方法中。我们首先来看第一阶段,也就是如何实现模式匹配的细节。这个过程被封装在cpmputeNextStates方法中,它根据当前的ComputationState以及事件计算接下来的ComputationState集合。它会以栈结构来存储State,并依次出栈进行遍历(首先被加入栈然后立即出栈的是当前ComputationState的State),根据正在遍历的State,获取在该状态上支持的状态转换集合。

for (StateTransition<T> stateTransition: stateTransitions) {
    try {
        //如果状态转换不需要条件或当前事件满足过滤条件
        if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) {
            //枚举当前状态转换所对应的操作
            switch (stateTransition.getAction()) {
                case PROCEED:
                    //在状态转换图中“前进”到下一个目标状态
                    states.push(stateTransition.getTargetState());
                    break;
                case IGNORE:
                    //将当前计算状态加入结果集
                    resultingComputationStates.add(computationState);

                    //为共享缓冲区中对应的状态的引用计数器加一
                    sharedBuffer.lock(computationState.getState(), computationState.getEvent(), 
                        computationState.getTimestamp());
                    break;
                case TAKE:
                    final State<T> newState = stateTransition.getTargetState();
                    final DeweyNumber oldVersion;
                    final DeweyNumber newComputationStateVersion;
                    final State<T> previousState = computationState.getState();
                    final T previousEvent = computationState.getEvent();
                    final long previousTimestamp;
                    final long startTimestamp;

                    //如果当前的计算状态是起始状态,构建新的计算状态版本
                    if (computationState.isStartState()) {
                        oldVersion = new DeweyNumber(startEventCounter++);
                        newComputationStateVersion = oldVersion.addStage();
                        startTimestamp = timestamp;
                        previousTimestamp = -1L;
                    } else {
                        startTimestamp = computationState.getStartTimestamp();
                        previousTimestamp = computationState.getTimestamp();
                        oldVersion = computationState.getVersion();

                        //如果新状态即为计算状态,则将版本号加一,否则新状态为当前状态的下一个状态,则版本号新增一级
                        if (newState.equals(computationState.getState())) {
                            newComputationStateVersion = oldVersion.increase();
                        } else {
                            newComputationStateVersion = oldVersion.addStage();
                        }
                    }

                    //如果原状态是起始状态,则直接向共享缓冲区中加入新状态,原状态和新状态的关系可通过oldVersion来遍历
                    if (previousState.isStart()) {
                        sharedBuffer.put(newState, event, timestamp, oldVersion);
                    } else {
                        //如果原状态为非起始状态,则需要结合之前的状态等信息,以便校验
                        sharedBuffer.put(newState, event, timestamp, previousState,
                                        previousEvent, previousTimestamp, oldVersion);
                    }

                    //在共享缓冲区中为新状态增加引用计数
                    sharedBuffer.lock(newState, event, timestamp);

                    //以新状态构建新的计算状态实例并加入最终的结果集
                    resultingComputationStates.add(new ComputationState<T>(
                        newState, event, timestamp, newComputationStateVersion, startTimestamp));
                    break;
            }
        }
    } catch (Exception e) {
        throw new RuntimeException("Failure happened in filter function.", e);
    }
}

第二个阶段是模式提取阶段,在NFA中对应方法extractPatternMatches,其实最核心的逻辑是我们在上一篇分析SharedBuffer中的extractPatterns方法。通过extractPatterns获取到所有的路径之后,它会遍历所有的路径,并重新更改其存储结构同时为这些路径指定新的名称:

// generate the correct names from the collection of LinkedHashMultimaps
for (LinkedHashMultimap<State<T>, T> path: paths) {
    Map<String, T> resultPath = new HashMap<>();
    for (State<T> key: path.keySet()) {
        int counter = 0;
        Set<T> events = path.get(key);

            // we iterate over the elements in insertion order
        for (T event: events) {
            resultPath.put(
                events.size() > 1 ? generateStateName(key.getName(), counter): key.getName(),
                        // copy the element so that the user can change it
                serializer.isImmutableType() ? event : serializer.copy(event)
            );
        }
    }

    result.add(resultPath);
}











原文发布时间为:2017-03-09

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
2月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
38 3
|
3月前
|
SQL 消息中间件 Apache
flink问题之cep超时事件如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
42 1
|
8月前
|
SQL Java API
Flink CEP在实时风控场景的落地与优化
Flink CEP在实时风控场景的落地与优化
|
9月前
|
运维 监控 数据处理
Flink的正则表达式--CEP规则引擎
Flink的正则表达式--CEP规则引擎
|
SQL 消息中间件 JSON
Flink CEP 新特性进展与在实时风控场景的落地
本次分享将会介绍 Flink 社区在 1.16 中对 Flink CEP 所做的增强与优化。
Flink CEP 新特性进展与在实时风控场景的落地
|
监控 安全 流计算
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(三)
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇
371 0
|
API 流计算
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(二)
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇
470 0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(二)
|
运维 分布式计算 监控
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(一)
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇
291 0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(一)
|
数据库 流计算
flink cep 跳过策略 AfterMatchSkipStrategy.skipPastLastEvent() 匹配过的不再匹配 碧坑指南
指示匹配过程后的跳过策略 今天讲的是 flink cep 如何实现 多个窗口之间的滚动匹配 即避免以下这种情况出现,当然是否需要避免取决你的工作需求或者要学习什么东西 flink cep pattern 代码
287 0
|
运维 监控 API
Flink CEP - Flink的复杂事件处理
1 Flink CEP 是什么 FlinkCEP - Flink的复杂事件处理。它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分
236 0
Flink CEP - Flink的复杂事件处理