大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink CEP 核心组件

CEP 的应用场景

CEP 的优势

超时事件提取

当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。


FlinkCEP开发流程

DataSource中的数据转换为DataStream

定义Pattern,并将DataStream和Pattern组合转换为PatternStream。

PatternStream 经过 Select、Process 等算子转换为 DataStream

再次转换为 DataStream 经过处理后,Sink到目标库。

SELECT 方法:

SingleOutputStreamOperator<PayEvent> result =
    patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent, PayEvent>() {
    @Override
    public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception {
        return map.get("begin").get(0);
    }
}, new PatternSelectFunction<PayEvent, PayEvent>() {
    @Override
    public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {
        return map.get("pay").get(0);
    }
});

对检测到的序列模式序列应用选择函数,对于每个模式序列,调用提供的 PatternSelectFunction,模式选择函数只能产生一个结果元素。

对超时的部分模式序列应用超时函数,对于每个部分模式序列,调用提供的 PatternTimeoutFunction,模式超时函数只能产生一个结果元素。

你可以在使用相同 OutputTag 进行 Select 操作 SingleOutputStreamOperator上获得SingleOutputStreamOperator生成的超时数据流。


非确定有限自动机

FlinkCEP 在运行时会将用户的逻辑转换为这样一个 NFA Graph(NFA对象)

所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行转换的过程。

上图中的状态机的功能,是检测二进制数是否含有偶数个0。从图上可以看出,输入只有1和0两种。

从S1状态开始,只有输入0才会转换到S2状态,同样S2状态下只有输入0才会转换到S1。所以,二进制输入完毕,如果满足最终状态,也就是最后停在S1状态,那么输入的二进制数就含有偶数个0。


CEP开发流程

FlinkCEP开发流程:


DataSource中数据转换为DataStream、Watermark、keyby

定义Pattern,并将DataStream和Pattern组合转换为PatternStream

PatternStream经过select、process等算子转换为 DataStream

再次转换为 DataStream 经过处理后,Sink到目标库

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>

案例1:恶意登录检测

找出5秒内,连续登录失败的账号

以下是数据:

new CepLoginBean(1L, "fail", 1597905234000L),
new CepLoginBean(1L, "success", 1597905235000L),
new CepLoginBean(2L, "fail", 1597905236000L),
new CepLoginBean(2L, "fail", 1597905237000L),
new CepLoginBean(2L, "fail", 1597905238000L),
new CepLoginBean(3L, "fail", 1597905239000L),
new CepLoginBean(3L, "success", 1597905240000L)

整体思路

  • 获取到数据
  • 在数据源上做Watermark
  • 在Watermark上根据ID分组keyBy
  • 做出模式Pattern
  • 在数据流上进行模式匹配
  • 提取匹配成功的数据

编写代码

package icu.wzk;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;


public class FlinkCepLoginTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStreamSource<CepLoginBean> data = env.fromElements(
                new CepLoginBean(1L, "fail", 1597905234000L),
                new CepLoginBean(1L, "success", 1597905235000L),
                new CepLoginBean(2L, "fail", 1597905236000L),
                new CepLoginBean(2L, "fail", 1597905237000L),
                new CepLoginBean(2L, "fail", 1597905238000L),
                new CepLoginBean(3L, "fail", 1597905239000L),
                new CepLoginBean(3L, "success", 1597905240000L)
        );
        SingleOutputStreamOperator<CepLoginBean> watermarks = data
                .assignTimestampsAndWatermarks(new WatermarkStrategy<CepLoginBean>() {

                    @Override
                    public WatermarkGenerator<CepLoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<CepLoginBean>() {

                            long maxTimestamp = Long.MAX_VALUE;
                            long maxOutOfOrderness = 500L;

                            @Override
                            public void onEvent(CepLoginBean event, long eventTimestamp, WatermarkOutput output) {
                                maxTimestamp = Math.max(maxTimestamp, event.getTimestamp());
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput output) {
                                output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
                            }
                        };
                    }
                }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                );
        KeyedStream<CepLoginBean, Long> keyed = watermarks
                .keyBy(new KeySelector<CepLoginBean, Long>() {
                    @Override
                    public Long getKey(CepLoginBean value) throws Exception {
                        return value.getUserId();
                    }
                });
        Pattern<CepLoginBean, CepLoginBean> pattern = Pattern
                .<CepLoginBean>begin("start")
                .where(new IterativeCondition<CepLoginBean>() {
                    @Override
                    public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception {
                        return cepLoginBean.getOperation().equals("fail");
                    }
                })
                .next("next")
                .where(new IterativeCondition<CepLoginBean>() {
                    @Override
                    public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception {
                        return cepLoginBean.getOperation().equals("fail");
                    }
                })
                .within(Time.seconds(5));
        PatternStream<CepLoginBean> patternStream = CEP.pattern(keyed, pattern);
        SingleOutputStreamOperator<CepLoginBean> process = patternStream
                .process(new PatternProcessFunction<CepLoginBean, CepLoginBean>() {
                    @Override
                    public void processMatch(Map<String, List<CepLoginBean>> map, Context context, Collector<CepLoginBean> collector) throws Exception {
                        System.out.println("map: " + map);
                        List<CepLoginBean> start = map.get("start");
                        collector.collect(start.get(0));
                    }
                });
        process.print();
        env.execute("FlinkCepLoginTest");
    }

}


class CepLoginBean {


    private Long userId;

    private String operation;

    private Long timestamp;

    public CepLoginBean(Long userId, String operation, Long timestamp) {
        this.userId = userId;
        this.operation = operation;
        this.timestamp = timestamp;
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getOperation() {
        return operation;
    }

    public void setOperation(String operation) {
        this.operation = operation;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "CepLoginBean{" +
                "userId=" + userId +
                ", operation='" + operation + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

运行结果

可以看到程序输出:

map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905238000}]}
CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}

Process finished with exit code 0

运行截图如下所示:

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
4月前
|
存储 数据采集 搜索推荐
Java 大视界 -- Java 大数据在智慧文旅旅游景区游客情感分析与服务改进中的应用实践(226)
本篇文章探讨了 Java 大数据在智慧文旅景区中的创新应用,重点分析了如何通过数据采集、情感分析与可视化等技术,挖掘游客情感需求,进而优化景区服务。文章结合实际案例,展示了 Java 在数据处理与智能推荐等方面的强大能力,为文旅行业的智慧化升级提供了可行路径。
Java 大视界 -- Java 大数据在智慧文旅旅游景区游客情感分析与服务改进中的应用实践(226)
|
4月前
|
数据采集 SQL 搜索推荐
大数据之路:阿里巴巴大数据实践——OneData数据中台体系
OneData是阿里巴巴内部实现数据整合与管理的方法体系与工具,旨在解决指标混乱、数据孤岛等问题。通过规范定义、模型设计与工具平台三层架构,实现数据标准化与高效开发,提升数据质量与应用效率。
大数据之路:阿里巴巴大数据实践——OneData数据中台体系
|
4月前
|
存储 SQL 分布式计算
大数据之路:阿里巴巴大数据实践——元数据与计算管理
本内容系统讲解了大数据体系中的元数据管理与计算优化。元数据部分涵盖技术、业务与管理元数据的分类及平台工具,并介绍血缘捕获、智能推荐与冷热分级等技术创新。元数据应用于数据标签、门户管理与建模分析。计算管理方面,深入探讨资源调度失衡、数据倾斜、小文件及长尾任务等问题,提出HBO与CBO优化策略及任务治理方案,全面提升资源利用率与任务执行效率。
|
2月前
|
人工智能 Cloud Native 算法
拔俗云原生 AI 临床大数据平台:赋能医学科研的开发者实践
AI临床大数据科研平台依托阿里云、腾讯云,打通医疗数据孤岛,提供从数据治理到模型落地的全链路支持。通过联邦学习、弹性算力与安全合规技术,实现跨机构协作与高效训练,助力开发者提升科研效率,推动医学AI创新落地。(238字)
|
4月前
|
存储 监控 大数据
大数据之路:阿里巴巴大数据实践——事实表设计
事实表是数据仓库核心,用于记录可度量的业务事件,支持高性能查询与低成本存储。主要包含事务事实表(记录原子事件)、周期快照表(捕获状态)和累积快照表(追踪流程)。设计需遵循粒度统一、事实可加性、一致性等原则,提升扩展性与分析效率。
|
5月前
|
存储 搜索推荐 算法
Java 大视界 -- Java 大数据在智慧文旅旅游线路规划与游客流量均衡调控中的应用实践(196)
本实践案例深入探讨了Java大数据技术在智慧文旅中的创新应用,聚焦旅游线路规划与游客流量调控难题。通过整合多源数据、构建用户画像、开发个性化推荐算法及流量预测模型,实现了旅游线路的精准推荐与流量的科学调控。在某旅游城市的落地实践中,游客满意度显著提升,景区流量分布更加均衡,充分展现了Java大数据技术在推动文旅产业智能化升级中的核心价值与广阔前景。
|
5月前
|
存储 分布式计算 算法
Java 大视界 -- Java 大数据在智能教育在线考试监考与作弊检测中的技术创新(193)
本文探讨了Java大数据技术在智能教育在线考试监考与作弊检测中的创新应用。随着在线考试的普及,作弊问题日益突出,传统监考方式难以应对。通过Java大数据技术,可实现考生行为分析、图像识别等多维度监控,提升作弊检测的准确性与效率。结合Hadoop与Spark等技术,系统能实时处理海量数据,构建智能监考体系,保障考试公平性,推动教育评价体系的数字化转型。
|
存储 分布式计算 大数据
大数据之路:阿里巴巴大数据实践——大数据领域建模综述
数据建模解决数据冗余、资源浪费、一致性缺失及开发低效等核心问题,通过分层设计提升性能10~100倍,优化存储与计算成本,保障数据质量并提升开发效率。相比关系数据库,数据仓库采用维度建模与列式存储,支持高效分析。阿里巴巴采用Kimball模型与分层架构,实现OLAP场景下的高性能计算与实时离线一体化。
|
3月前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
216 14
|
4月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
168 0

热门文章

最新文章