Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇(三)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(四)CEP篇

2. 匹配事件的通用处理(process)

自1.8版本之后,Flink CEP引入了对于匹配事件的通用检测处理方式,那就是直接调用PatternStream的.process()方法,传入一个PatternProcessFunction。这看起来就像是我们熟悉的处理函数(process function),它也可以访问一个上下文(Context),进行更多的操作。所以PatternProcessFunction功能更加丰富、调用更加灵活,可以完全覆盖其他接口,也就成为了目前官方推荐的处理方式。事实上,PatternSelectFunction和PatternFlatSelectFunction在CEP内部执行时也会被转换成PatternProcessFunction。我们可以使用PatternProcessFunction将之前的代码重写如下:

// 3. 将匹配到的复杂事件选择出来,然后包装成报警信息输出
patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context ctx, 
Collector<String> out) throws Exception {
LoginEvent first = map.get("fails").get(0);
LoginEvent second = map.get("fails").get(1);
LoginEvent third = map.get("fails").get(2);
out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp + ", " + second.timestamp + ", " + third.timestamp);
}
}).print("warning");

可以看到,PatternProcessFunction中必须实现一个processMatch()方法;这个方法与之前的flatSelect()类似,只是多了一个上下文Context参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time);还可以调用.output()方法将数据输出到侧输出流。

4.3 处理超时事件

复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。检测处理的过程具体如下:

(1)如果当前事件符合模式匹配的条件,就接受该事件,保存到对应的Map中;
(2)如果在模式序列定义中,当前事件后面还应该有其他事件,就继续读取事件流进行检测;如果模式序列的定义已经全部满足,那么就成功检测到了一组匹配的复杂事件,调用PatternProcessFunction的processMatch()方法进行处理;
(3)如果当前事件不符合模式匹配的条件,就丢弃该事件;
(4)如果当前事件破坏了模式序列中定义的限制条件,比如不满足严格近邻要求,那么当前已检测的一组部分匹配事件都被丢弃,重新开始检测。
不过在有时间限制的情况下,有时我们会希望捕获并处理超时事件。CEP提供了处理超时事件的方法。

1. 使用PatternProcessFunction的侧输出流

在Flink CEP中,提供了一个专门捕捉超时的部分匹配事件的接口,叫作TimedOutPartialMatchHandler。这个接口需要实现一个processTimedOutMatch()方法,可以将超时的、已检测到的部分匹配事件放在一个Map中,作为方法的第一个参数;方法的第二个参数则是PatternProcessFunction的上下文Context。所以这个接口必须与PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。代码中的调用方式如下:

class MyPatternProcessFunction extends PatternProcessFunction<Event, String> 
implements TimedOutPartialMatchHandler<Event> {
    // 正常匹配事件的处理
@Override
    public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception{
        ...
    }
    // 超时部分匹配事件的处理
    @Override
    public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception{
        Event startEvent = match.get("start").get(0);
        OutputTag<Event> outputTag = new OutputTag<Event>("time-out"){};
        ctx.output(outputTag, startEvent);
    }
}

我们在processTimedOutMatch()方法中定义了一个输出标签(OutputTag)。调用ctx.output()方法,就可以将超时的部分匹配事件输出到标签所标识的侧输出流了。

2. 使用PatternTimeoutFunction

上文提到的PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力,这是官方推荐的做法。此外,Flink CEP中也保留了早期简化版的PatternSelectFunction,它无法直接处理超时事件,不过我们可以通过调用PatternStream的.select()方法时多传入一个PatternTimeoutFunction参数来实现这一点。由于调用.select()方法后会得到唯一的DataStream,所以正常匹配事件和超时事件的处理结果不应该放在同一条流中。正常匹配事件的处理结果会进入转换后得到的DataStream,而超时事件的处理结果则会进入侧输出流;这个侧输出流需要另外传入一个侧输出标签(OutputTag)来指定。所以最终我们在调用PatternStream的.select()方法时需要传入三个参数:侧输出流标签(OutputTag),超时事件处理函数PatternTimeoutFunction,匹配事件提取函数PatternSelectFunction。下面是一个代码中的调用方式:

// 定义一个侧输出流标签,用于标识超时侧输出流
OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};
// 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
SingleOutputStreamOperator<String> resultStream = patternStream
.select(timeoutTag,
// 超时部分匹配事件的处理
        new PatternTimeoutFunction<Event, String>() {
            @Override
            public String timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
                Event event = pattern.get("start").get(0);
                return "超时:" + event.toString();
            }
        },
// 正常匹配事件的处理
        new PatternSelectFunction<Event, String>() {
            @Override
            public String select(Map<String, List<Event>> pattern) throws Exception {
...
            }
        }
);
// 将正常匹配和超时部分匹配的处理结果流打印输出
resultStream.print("matched");
resultStream.getSideOutput(timeoutTag).print("timeout");

3. 应用实例

接下来我们看一个具体的应用场景。在电商平台中,最终创造收入和利润的是用户下单购买的环节。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如15分钟),如果下单后一段时间仍未支付,订单就会被取消。首先定义出要处理的数据类型。我们面对的是订单事件,主要包括用户对订单的创建(下单)和支付两种行为。因此可以定义POJO类OrderEvent如下,其中属性字段包括用户ID、订单ID、事件类型(操作类型)以及时间戳。

public class OrderEvent {
    public String userId;
    public String orderId;
    public String eventType;
    public Long timestamp;
    public OrderEvent() {
    }
    public OrderEvent(String userId, String orderId, String eventType, Long timestamp) {
        this.userId = userId;
        this.orderId = orderId;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }
    @Override
    public String toString() {
        return "OrderEvent{" +
"userId='" + userId + '\'' +
                "orderId='" + orderId + '\'' +
                ", eventType='" + eventType + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

当前需求的重点在于对超时未支付的用户进行监控提醒,也就是需要检测有下单行为、但15分钟内没有支付行为的复杂事件。在下单和支付之间,可以有其他操作(比如对订单的修改),所以两者之间是宽松近邻关系。我们重点要处理的是超时的部分匹配事件。对原始的订单事件流按照订单ID进行分组,然后检测每个订单的“下单-支付”复杂事件,如果出现超时事件需要输出报警提示信息。整体代码实现如下:

public class OrderTimeoutDetect {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 获取订单事件流,并提取时间戳、生成水位线
        KeyedStream<OrderEvent, String> stream = env
                .fromElements(
                        new OrderEvent("user_1", "order_1", "create", 1000L),
                        new OrderEvent("user_2", "order_2", "create", 2000L),
                        new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
                        new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
                        new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
                        new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<OrderEvent>() {
                                            @Override
                                            public long extractTimestamp(OrderEvent event, long l) {
                                                return event.timestamp;
                                            }
                                        }
                                )
                )
                .keyBy(order -> order.orderId);    // 按照订单ID分组
        // 1. 定义Pattern
        Pattern<OrderEvent, ?> pattern = Pattern
                .<OrderEvent>begin("create")    // 首先是下单事件
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return value.eventType.equals("create");
                    }
                })
                .followedBy("pay")    // 之后是支付事件;中间可以修改订单,宽松近邻
                .where(new SimpleCondition<OrderEvent>() {
                    @Override
                    public boolean filter(OrderEvent value) throws Exception {
                        return value.eventType.equals("pay");
                    }
                })
                .within(Time.minutes(15));    // 限制在15分钟之内
        // 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<OrderEvent> patternStream = CEP.pattern(stream, pattern);
        // 3. 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
        SingleOutputStreamOperator<String> payedOrderStream = patternStream.process(new OrderPayPatternProcessFunction());
        // 将正常匹配和超时部分匹配的处理结果流打印输出
        payedOrderStream.print("payed");
        payedOrderStream.getSideOutput(timeoutTag).print("timeout");
        env.execute();
    }
// 实现自定义的PatternProcessFunction,需实现TimedOutPartialMatchHandler接口
    public static class OrderPayPatternProcessFunction extends PatternProcessFunction<OrderEvent, String> implements TimedOutPartialMatchHandler<OrderEvent> {
        // 处理正常匹配事件
@Override
        public void processMatch(Map<String, List<OrderEvent>> match, Context ctx, Collector<String> out) throws Exception {
            OrderEvent payEvent = match.get("pay").get(0);
            out.collect("订单 " + payEvent.orderId + " 已支付!");
        }
// 处理超时未支付事件
        @Override
        public void processTimedOutMatch(Map<String, List<OrderEvent>> match, Context ctx) throws Exception {
            OrderEvent createEvent = match.get("create").get(0);
            ctx.output(new OutputTag<String>("timeout"){}, "订单 " + createEvent.orderId + " 超时未支付!用户为:" + createEvent.userId);
        }
    }
}

4.4 处理迟到数据

CEP主要处理的是先后发生的一组复杂事件,所以事件的顺序非常关键。在事件时间语义下,需要按照事件自身的时间戳来排序。这就有可能出现时间戳大的事件先到、时间戳小的事件后到的现象,也就是所谓的“乱序数据”或“迟到数据”。在Flink CEP中沿用了通过设置水位线(watermark)延迟来处理乱序数据的做法。不过水位线的延迟不可能完美处理所有迟到数据;如果不希望迟到数据丢掉,可以借鉴窗口的做法。Flink CEP同样提供了将迟到事件输出到侧输出流的方式:我们可以基于PatternStream直接调用.sideOutputLateData()方法,传入一个OutputTag,将迟到数据放入侧输出流另行处理。代码中调用方式如下:

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
// 定义一个侧输出流的标签
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};
SingleOutputStreamOperator<ComplexEvent> result = patternStream
    .sideOutputLateData(lateDataOutputTag)    // 将迟到数据输出到侧输出流
    .select(    
// 处理正常匹配数据
        new PatternSelectFunction<Event, ComplexEvent>() {...}
    );
// 从结果中提取侧输出流
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

可以看到,整个处理流程与窗口非常相似。经处理匹配数据得到结果数据流之后,可以调用.getSideOutput()方法来提取侧输出流,捕获迟到数据进行额外处理。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
机器学习/深度学习 运维 监控
实时异常检测实战:Flink+PAI 算法模型服务化架构设计
本文深入探讨了基于 Apache Flink 与阿里云 PAI 构建的实时异常检测系统。内容涵盖技术演进、架构设计、核心模块实现及金融、工业等多领域实战案例,解析流处理、模型服务化、状态管理等关键技术,并提供性能优化与高可用方案,助力企业打造高效智能的实时异常检测平台。
225 1
|
5月前
|
SQL 运维 Java
蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造
本文介绍了对Flink实时计算编译任务的Koupleless架构改造。为解决进程模型带来的响应慢、资源消耗大等问题,团队将进程模型改为线程模型,并借助Koupleless的类加载隔离能力实现版本和包的隔离。通过动态装配Plugin及其Classpath,以及Biz运行时仅对依赖Plugin可见的设计,大幅优化了编译任务的性能。结果表明,新架构使编译耗时降低50%,吞吐量提升5倍以上。
蚂蚁 Flink 实时计算编译任务 Koupleless 架构改造
|
7月前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
1119 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
6月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
169 5
|
6月前
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
160 4
|
存储 编译器 API
Flink-CEP之NFA编译器
NFAb 编译器的作用是将模式对象编译成NFA或者NFAFactory(用来创建多种NFA对象)。这个编译的过程,需要对模式进行拆分从而构建状态以及根据条件构建状态转换信息,最终根据构建好的状态集合来创建NFA。
2236 0
|
12月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
10月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3077 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
10月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
395 56

热门文章

最新文章