Flink-CEP之NFA

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: NFAb 模型包含两个阶段:第一个阶段是模式匹配阶段,在这个阶段它将会向最终态过渡并随着事件被选择而扩展缓冲区;第二个阶段是匹配提取阶段,该阶段发生在超时或者到达最终态时,将会从缓冲区中检索所产生的匹配。
NFAb
模型包含两个阶段:第一个阶段是模式匹配阶段,在这个阶段它将会向最终态过渡并随着事件被选择而扩展缓冲区;第二个阶段是匹配提取阶段,该阶段发生在超时或者到达最终态时,将会从缓冲区中检索所产生的匹配。

首先我们先来看在NFA中如何保存之前“计算”过的状态。这是什么意思呢?NFA作为一个建立在流处理之上的状态机,它以流中的事件作为输入并驱动状态的转换。然而,状态的转换并不是由当前被处理的事件独立决定的,它可能还依赖于之前已经计算过的事件的状态信息。Flink定义了类ComputationState来封装计算过的事件的状态,它包含如下这些属性:

NFA以一个队列保存了其状态机中当前所有的ComputationState。作为一个非确定性有限自动机的实现,NFA以process方法来接收流中的每个事件,进行处理后返回匹配的以及超时的事件序列。

方法最初首先遍历残留在队列中的ComputationState,它们通常是NFA处理之前的事件所建立的:

for (int i = 0; i < numberComputationStates; i++) {
    //队列中出队一个ComputationState
    ComputationState<T> computationState = computationStates.poll();

    final Collection<ComputationState<T>> newComputationStates;

    //如果当前遍历的ComputationState不是起始状态,同时窗口时间大于零
    //且待处理事件的时间戳跟当前遍历状态的时间戳之差大于窗口时间(说明当前ComputationState已滞后)
    if (!computationState.isStartState() &&
        windowTime > 0 &&
        timestamp - computationState.getStartTimestamp() >= windowTime) {

        //如果NFA被指定处理超时,则根据当前的ComputationState对象提取超时的匹配模式的事件序列,
        //并将超时的匹配模式的事件序列加入到timeoutResult用于最后的返回
        if (handleTimeout) {
            Collection<Map<String, T>> timeoutPatterns = extractPatternMatches(computationState);

            for (Map<String, T> timeoutPattern : timeoutPatterns) {
                timeoutResult.add(Tuple2.of(timeoutPattern, timestamp));
            }
        }

        //从共享缓冲区中移除已经超出窗口长度的ComputationState对应的事件信息
        sharedBuffer.release(computationState.getState(), computationState.getEvent(), 
            computationState.getTimestamp());
        sharedBuffer.remove(computationState.getState(), computationState.getEvent(), 
            computationState.getTimestamp());

        //不产生新的ComputationState
        newComputationStates = Collections.emptyList();
    } else {
        //基于当前的ComputationState以及待处理的事件计算新的ComputationState集合
        newComputationStates = computeNextStates(computationState, event, timestamp);
    }

    //遍历新产生的ComputationState
    for (ComputationState<T> newComputationState: newComputationStates) {
        //如果新的ComputationState到达最终态,则提取出其对应的事件匹配,并加入到待返回的结果集中
        if (newComputationState.isFinalState()) {
            Collection<Map<String, T>> matches = extractPatternMatches(newComputationState);
            result.addAll(matches);

            //移除已匹配的模式因为它们不再需要了
            sharedBuffer.release(newComputationState.getState(), newComputationState.getEvent(), 
                newComputationState.getTimestamp());
            sharedBuffer.remove(newComputationState.getState(), newComputationState.getEvent(), 
                newComputationState.getTimestamp());
        } else {
            //否则将新的ComputationState加入队列待处理下一个事件时处理
            computationStates.add(newComputationState);
        }
    }
}

接下来还有一步是基于窗口长度删除共享缓冲区内的元素超出窗口长度的元素。

以上这个process方法时NFA处理事件的主干部分,本篇开头提到的两个阶段的逻辑实现处于额外的两个方法中。我们首先来看第一阶段,也就是如何实现模式匹配的细节。这个过程被封装在cpmputeNextStates方法中,它根据当前的ComputationState以及事件计算接下来的ComputationState集合。它会以栈结构来存储State,并依次出栈进行遍历(首先被加入栈然后立即出栈的是当前ComputationState的State),根据正在遍历的State,获取在该状态上支持的状态转换集合。

for (StateTransition<T> stateTransition: stateTransitions) {
    try {
        //如果状态转换不需要条件或当前事件满足过滤条件
        if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) {
            //枚举当前状态转换所对应的操作
            switch (stateTransition.getAction()) {
                case PROCEED:
                    //在状态转换图中“前进”到下一个目标状态
                    states.push(stateTransition.getTargetState());
                    break;
                case IGNORE:
                    //将当前计算状态加入结果集
                    resultingComputationStates.add(computationState);

                    //为共享缓冲区中对应的状态的引用计数器加一
                    sharedBuffer.lock(computationState.getState(), computationState.getEvent(), 
                        computationState.getTimestamp());
                    break;
                case TAKE:
                    final State<T> newState = stateTransition.getTargetState();
                    final DeweyNumber oldVersion;
                    final DeweyNumber newComputationStateVersion;
                    final State<T> previousState = computationState.getState();
                    final T previousEvent = computationState.getEvent();
                    final long previousTimestamp;
                    final long startTimestamp;

                    //如果当前的计算状态是起始状态,构建新的计算状态版本
                    if (computationState.isStartState()) {
                        oldVersion = new DeweyNumber(startEventCounter++);
                        newComputationStateVersion = oldVersion.addStage();
                        startTimestamp = timestamp;
                        previousTimestamp = -1L;
                    } else {
                        startTimestamp = computationState.getStartTimestamp();
                        previousTimestamp = computationState.getTimestamp();
                        oldVersion = computationState.getVersion();

                        //如果新状态即为计算状态,则将版本号加一,否则新状态为当前状态的下一个状态,则版本号新增一级
                        if (newState.equals(computationState.getState())) {
                            newComputationStateVersion = oldVersion.increase();
                        } else {
                            newComputationStateVersion = oldVersion.addStage();
                        }
                    }

                    //如果原状态是起始状态,则直接向共享缓冲区中加入新状态,原状态和新状态的关系可通过oldVersion来遍历
                    if (previousState.isStart()) {
                        sharedBuffer.put(newState, event, timestamp, oldVersion);
                    } else {
                        //如果原状态为非起始状态,则需要结合之前的状态等信息,以便校验
                        sharedBuffer.put(newState, event, timestamp, previousState,
                                        previousEvent, previousTimestamp, oldVersion);
                    }

                    //在共享缓冲区中为新状态增加引用计数
                    sharedBuffer.lock(newState, event, timestamp);

                    //以新状态构建新的计算状态实例并加入最终的结果集
                    resultingComputationStates.add(new ComputationState<T>(
                        newState, event, timestamp, newComputationStateVersion, startTimestamp));
                    break;
            }
        }
    } catch (Exception e) {
        throw new RuntimeException("Failure happened in filter function.", e);
    }
}

第二个阶段是模式提取阶段,在NFA中对应方法extractPatternMatches,其实最核心的逻辑是我们在上一篇分析SharedBuffer中的extractPatterns方法。通过extractPatterns获取到所有的路径之后,它会遍历所有的路径,并重新更改其存储结构同时为这些路径指定新的名称:

// generate the correct names from the collection of LinkedHashMultimaps
for (LinkedHashMultimap<State<T>, T> path: paths) {
    Map<String, T> resultPath = new HashMap<>();
    for (State<T> key: path.keySet()) {
        int counter = 0;
        Set<T> events = path.get(key);

            // we iterate over the elements in insertion order
        for (T event: events) {
            resultPath.put(
                events.size() > 1 ? generateStateName(key.getName(), counter): key.getName(),
                        // copy the element so that the user can change it
                serializer.isImmutableType() ? event : serializer.copy(event)
            );
        }
    }

    result.add(resultPath);
}











原文发布时间为:2017-03-09

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
65 0
|
1月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
43 0
|
1月前
|
分布式计算 监控 大数据
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
53 0
|
3月前
|
监控 Java API
【揭秘】如何用Flink CEP揪出那些偷偷摸摸连续登录失败的“捣蛋鬼”?——一场数据流中的侦探游戏
【8月更文挑战第26天】Flink 是一款先进的流处理框架,提供复杂事件处理(CEP)功能以识别实时数据流中的特定模式。CEP 在 Flink 中通过 `CEP` API 实现,支持基于模式匹配的事件检测。本文通过监测用户连续三次登录失败的具体案例介绍 Flink CEP 的工作原理与应用方法。首先创建 Flink 环境并定义数据源,接着利用 CEP 定义连续三次失败登录的模式,最后处理匹配结果并输出警报。Flink CEP 能够轻松扩展至更复杂的场景,如异常行为检测和交易欺诈检测等,有效应对多样化的业务需求。
44 0
|
6月前
|
数据处理 Apache 流计算
【Flink】Flink的CEP机制
【4月更文挑战第21天】【Flink】Flink的CEP机制
|
6月前
|
资源调度 监控 Java
实时计算 Flink版产品使用合集之如何使用CEP库进行数据处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
67 3
|
6月前
|
SQL 消息中间件 Apache
flink问题之cep超时事件如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
运维 监控 数据处理
Flink的正则表达式--CEP规则引擎
Flink的正则表达式--CEP规则引擎
|
Apache 流计算
Flink CEP 在抖音电商的业务实践|电商行业实践专栏上线
Flink-learning 学训平台和电商行业实践专栏来啦!
6468 0
Flink CEP 在抖音电商的业务实践|电商行业实践专栏上线