Flink之CEP案例分析-网络攻击检测

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解。所选取的案例是对网络遭受的潜在攻击进行检测并给出告警。当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据。

上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解。所选取的案例是对网络遭受的潜在攻击进行检测并给出告警。当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据。

假定一家云服务提供商,有多个跨地区的数据中心,每个数据中心会定时向监控中心上报其瞬时流量。

我们将检测的结果分为三个等级:

  • 正常:流量在预设的正常范围内;
  • 警告:某数据中心在10秒内连续两次上报的流量超过认定的正常值;
  • 报警:某数据中心在30秒内连续两次匹配警告;

首先,我们构建source,这里我们选择的是并行source,因此需要继承RichParallelSourceFunction类。所有的数据通过模拟器随机生成,其中数据中心编号为整型且取值范围为[0, 10),数据生成的事件间隔由PAUSE常量指定,默认为100毫秒:

//parallel source
DataStream<MonitorEvent> inputEventStream = env.addSource(
    new MonitorEventSource(
        MAX_DATACENTER_ID,
        STREAM_STD,
        STREAM_MEAN,
        PAUSE
    )
).assignTimestampsAndWatermarks(new IngestionTimeExtractor<MonitorEvent>());

下面,我们来构建警告模式,按照我们设定的警告等级,其模式定义如下:

Pattern<MonitorEvent, ?> warningPattern = Pattern.<MonitorEvent>begin("first")
    .subtype(NetworkStreamEvent.class)
    .where(evt -> evt.getStream() >= STREAM_THRESHOLD)
    .next("second")
    .subtype(NetworkStreamEvent.class)
    .where(evt -> evt.getStream() >= STREAM_THRESHOLD)
    .within(Time.seconds(10));

根据该模式构建模式流:

PatternStream<MonitorEvent> warningPatternStream =
    CEP.pattern(inputEventStream.keyBy("dataCenterId"), warningPattern);

在警告的模式流中筛选出配对的警告事件对,生成警告事件对象流(告警事件对象会算出,前后两个匹配的流量事件的平均值):

DataStream<NetworkStreamWarning> warnings = warningPatternStream.select(
    (Map<String, MonitorEvent> pattern) -> {
        NetworkStreamEvent first = (NetworkStreamEvent) pattern.get("first");
        NetworkStreamEvent second = (NetworkStreamEvent) pattern.get("second");

        return new NetworkStreamWarning(first.getDataCenterId(),
            (first.getStream() + second.getStream()) / 2);
    }
);

按照设定的等级,告警模式定义如下:

Pattern<NetworkStreamWarning, ?> alertPattern = Pattern.<NetworkStreamWarning>
    begin("first").next("second").within(Time.seconds(30));

在警告事件流中应用告警模式,得到告警模式流:

PatternStream<NetworkStreamWarning> alertPatternStream = CEP.pattern(warnings.keyBy
    ("dataCenterId"), alertPattern);

在告警模式流中匹配警告模式对,如果模式对中第一个警告对象的平均流量值小于第二个警告对象的平均流量值,则构建告警对象并输出该对象从而形成告警流:

DataStream<NetworkStreamAlert> alerts = alertPatternStream.flatSelect(
    (Map<String, NetworkStreamWarning> pattern, Collector<NetworkStreamAlert> out) -> {
        NetworkStreamWarning first = pattern.get("first");
        NetworkStreamWarning second = pattern.get("second");

        //first avg < second avg
        if (first.getAverageStream() < second.getAverageStream()) {
            out.collect(new NetworkStreamAlert(first.getDataCenterId()));
        }
    }
);

最终,sink到控制台:

warnings.print();
alerts.print();

从上面的代码段可见,CEP的关键是定义合适的模式。关于模式的相关的API,我们之前已进行过分析。为了节省篇幅,本文只列出了核心代码片段。

需要注意的是,因为包含Java 8的lambdas,当你使用javac作为编译器时,将会得到错误提示:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
The generic type parameters of 'Map' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to 
use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1331)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1317)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:347)
at org.apache.flink.cep.PatternStream.select(PatternStream.java:81)
at com.diveintoapacheflink.chapter11.NetworkAttackMonitor.main(NetworkAttackMonitor.java:55)
at ...

解决方案是使用Eclipse JDT来编译代码。


原文发布时间为:2017-03-01
本文作者:vinoYang
本文来自云栖社区合作伙伴 CSDN博客,了解相关信息可以关注CSDN博客。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
25天前
|
机器学习/深度学习 算法 数据安全/隐私保护
基于GRU网络的MQAM调制信号检测算法matlab仿真,对比LSTM
本研究基于MATLAB 2022a,使用GRU网络对QAM调制信号进行检测。QAM是一种高效调制技术,广泛应用于现代通信系统。传统方法在复杂环境下性能下降,而GRU通过门控机制有效提取时间序列特征,实现16QAM、32QAM、64QAM、128QAM的准确检测。仿真结果显示,GRU在低SNR下表现优异,且训练速度快,参数少。核心程序包括模型预测、误检率和漏检率计算,并绘制准确率图。
86 65
基于GRU网络的MQAM调制信号检测算法matlab仿真,对比LSTM
|
4月前
|
人工智能 边缘计算 物联网
蜂窝网络未来发展趋势的分析
蜂窝网络未来发展趋势的分析
159 2
|
7天前
|
机器学习/深度学习 存储 算法
基于MobileNet深度学习网络的活体人脸识别检测算法matlab仿真
本内容主要介绍一种基于MobileNet深度学习网络的活体人脸识别检测技术及MQAM调制类型识别方法。完整程序运行效果无水印,需使用Matlab2022a版本。核心代码包含详细中文注释与操作视频。理论概述中提到,传统人脸识别易受非活体攻击影响,而MobileNet通过轻量化的深度可分离卷积结构,在保证准确性的同时提升检测效率。活体人脸与非活体在纹理和光照上存在显著差异,MobileNet可有效提取人脸高级特征,为无线通信领域提供先进的调制类型识别方案。
|
4月前
|
数据采集 缓存 定位技术
网络延迟对Python爬虫速度的影响分析
网络延迟对Python爬虫速度的影响分析
|
19天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
|
6天前
|
机器学习/深度学习 数据采集 算法
基于yolov2和googlenet网络的疲劳驾驶检测算法matlab仿真
本内容展示了基于深度学习的疲劳驾驶检测算法,包括算法运行效果预览(无水印)、Matlab 2022a 软件版本说明、部分核心程序(完整版含中文注释与操作视频)。理论部分详细阐述了疲劳检测原理,通过对比疲劳与正常状态下的特征差异,结合深度学习模型提取驾驶员面部特征变化。具体流程包括数据收集、预处理、模型训练与评估,使用数学公式描述损失函数和推理过程。课题基于 YOLOv2 和 GoogleNet,先用 YOLOv2 定位驾驶员面部区域,再由 GoogleNet 分析特征判断疲劳状态,提供高准确率与鲁棒性的检测方法。
|
3月前
|
存储 安全 物联网
浅析Kismet:无线网络监测与分析工具
Kismet是一款开源的无线网络监测和入侵检测系统(IDS),支持Wi-Fi、Bluetooth、ZigBee等协议,具备被动监听、实时数据分析、地理定位等功能。广泛应用于安全审计、网络优化和频谱管理。本文介绍其安装配置、基本操作及高级应用技巧,帮助用户掌握这一强大的无线网络安全工具。
164 9
浅析Kismet:无线网络监测与分析工具
|
3月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
655 2
探索Flink动态CEP:杭州银行的实战案例
|
3月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
316 27
|
3月前
|
数据采集 机器学习/深度学习 人工智能
基于AI的网络流量分析:构建智能化运维体系
基于AI的网络流量分析:构建智能化运维体系
509 13

热门文章

最新文章