Flink之CEP案例分析-网络攻击检测

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解。所选取的案例是对网络遭受的潜在攻击进行检测并给出告警。当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据。

上一篇我们介绍了Flink CEP的API,这一篇我们将以结合一个案例来练习使用CEP的API编写应用程序,以强化对API的理解。所选取的案例是对网络遭受的潜在攻击进行检测并给出告警。当下互联网安全形势仍然严峻,网络攻击屡见不鲜且花样众多,这里我们以DDOS(分布式拒绝服务攻击)产生的流入流量来作为遭受攻击的判断依据。

假定一家云服务提供商,有多个跨地区的数据中心,每个数据中心会定时向监控中心上报其瞬时流量。

我们将检测的结果分为三个等级:

  • 正常:流量在预设的正常范围内;
  • 警告:某数据中心在10秒内连续两次上报的流量超过认定的正常值;
  • 报警:某数据中心在30秒内连续两次匹配警告;

首先,我们构建source,这里我们选择的是并行source,因此需要继承RichParallelSourceFunction类。所有的数据通过模拟器随机生成,其中数据中心编号为整型且取值范围为[0, 10),数据生成的事件间隔由PAUSE常量指定,默认为100毫秒:

//parallel source
DataStream<MonitorEvent> inputEventStream = env.addSource(
    new MonitorEventSource(
        MAX_DATACENTER_ID,
        STREAM_STD,
        STREAM_MEAN,
        PAUSE
    )
).assignTimestampsAndWatermarks(new IngestionTimeExtractor<MonitorEvent>());

下面,我们来构建警告模式,按照我们设定的警告等级,其模式定义如下:

Pattern<MonitorEvent, ?> warningPattern = Pattern.<MonitorEvent>begin("first")
    .subtype(NetworkStreamEvent.class)
    .where(evt -> evt.getStream() >= STREAM_THRESHOLD)
    .next("second")
    .subtype(NetworkStreamEvent.class)
    .where(evt -> evt.getStream() >= STREAM_THRESHOLD)
    .within(Time.seconds(10));

根据该模式构建模式流:

PatternStream<MonitorEvent> warningPatternStream =
    CEP.pattern(inputEventStream.keyBy("dataCenterId"), warningPattern);

在警告的模式流中筛选出配对的警告事件对,生成警告事件对象流(告警事件对象会算出,前后两个匹配的流量事件的平均值):

DataStream<NetworkStreamWarning> warnings = warningPatternStream.select(
    (Map<String, MonitorEvent> pattern) -> {
        NetworkStreamEvent first = (NetworkStreamEvent) pattern.get("first");
        NetworkStreamEvent second = (NetworkStreamEvent) pattern.get("second");

        return new NetworkStreamWarning(first.getDataCenterId(),
            (first.getStream() + second.getStream()) / 2);
    }
);

按照设定的等级,告警模式定义如下:

Pattern<NetworkStreamWarning, ?> alertPattern = Pattern.<NetworkStreamWarning>
    begin("first").next("second").within(Time.seconds(30));

在警告事件流中应用告警模式,得到告警模式流:

PatternStream<NetworkStreamWarning> alertPatternStream = CEP.pattern(warnings.keyBy
    ("dataCenterId"), alertPattern);

在告警模式流中匹配警告模式对,如果模式对中第一个警告对象的平均流量值小于第二个警告对象的平均流量值,则构建告警对象并输出该对象从而形成告警流:

DataStream<NetworkStreamAlert> alerts = alertPatternStream.flatSelect(
    (Map<String, NetworkStreamWarning> pattern, Collector<NetworkStreamAlert> out) -> {
        NetworkStreamWarning first = pattern.get("first");
        NetworkStreamWarning second = pattern.get("second");

        //first avg < second avg
        if (first.getAverageStream() < second.getAverageStream()) {
            out.collect(new NetworkStreamAlert(first.getDataCenterId()));
        }
    }
);

最终,sink到控制台:

warnings.print();
alerts.print();

从上面的代码段可见,CEP的关键是定义合适的模式。关于模式的相关的API,我们之前已进行过分析。为了节省篇幅,本文只列出了核心代码片段。

需要注意的是,因为包含Java 8的lambdas,当你使用javac作为编译器时,将会得到错误提示:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: 
The generic type parameters of 'Map' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to 
use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs containing lambda expressions.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1331)
at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1317)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:347)
at org.apache.flink.cep.PatternStream.select(PatternStream.java:81)
at com.diveintoapacheflink.chapter11.NetworkAttackMonitor.main(NetworkAttackMonitor.java:55)
at ...

解决方案是使用Eclipse JDT来编译代码。


原文发布时间为:2017-03-01
本文作者:vinoYang
本文来自云栖社区合作伙伴 CSDN博客,了解相关信息可以关注CSDN博客。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
13天前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
50 5
|
10天前
|
机器学习/深度学习 数据采集 存储
时间序列预测新突破:深入解析循环神经网络(RNN)在金融数据分析中的应用
【10月更文挑战第7天】时间序列预测是数据科学领域的一个重要课题,特别是在金融行业中。准确的时间序列预测能够帮助投资者做出更明智的决策,比如股票价格预测、汇率变动预测等。近年来,随着深度学习技术的发展,尤其是循环神经网络(Recurrent Neural Networks, RNNs)及其变体如长短期记忆网络(LSTM)和门控循环单元(GRU),在处理时间序列数据方面展现出了巨大的潜力。本文将探讨RNN的基本概念,并通过具体的代码示例展示如何使用这些模型来进行金融数据分析。
68 2
|
13天前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
43 0
|
13天前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
63 0
|
10天前
|
存储 安全 网络安全
云端盾牌:云计算时代的网络安全守护在数字化浪潮中,云计算以其高效、灵活的特性成为企业转型的加速器。然而,伴随其迅猛发展,网络安全问题亦如影随形,成为悬在每个组织头顶的达摩克利斯之剑。本文旨在探讨云计算服务中的网络安全挑战,分析信息安全的重要性,并提出相应对策,以期为企业构建一道坚实的云端防护网。
在当今这个数据驱动的时代,云计算已成为推动创新与效率的关键力量。它允许用户随时随地访问强大的计算资源,降低了企业的运营成本,加速了产品上市时间。但随之而来的网络威胁也日益猖獗,尤其是对于依赖云服务的企业而言,数据泄露、身份盗用等安全事件频发,不仅造成经济损失,更严重损害品牌信誉。本文深入剖析云计算环境中的安全风险,强调建立健全的信息安全管理机制的重要性,并分享一系列有效策略,旨在帮助企业和个人用户在享受云服务带来的便利的同时,也能构筑起强有力的网络防线。
|
6天前
|
安全 网络协议 物联网
物联网僵尸网络和 DDoS 攻击的 CERT 分析
物联网僵尸网络和 DDoS 攻击的 CERT 分析
|
10天前
|
存储 算法 数据可视化
单细胞分析 | Cicero+Signac 寻找顺式共可及网络
单细胞分析 | Cicero+Signac 寻找顺式共可及网络
18 0
|
13天前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
37 0
|
13天前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
27 0
|
11天前
|
安全 网络安全 数据安全/隐私保护
网络安全与信息安全:关于网络安全漏洞、加密技术、安全意识等方面的知识分享
【10月更文挑战第6天】在数字化时代,网络安全和信息安全已成为我们生活中不可或缺的一部分。本文将探讨网络安全漏洞、加密技术和安全意识等方面的内容,以帮助读者更好地了解这些主题,并采取适当的措施保护自己的信息安全。我们将通过代码示例来演示一些常见的安全漏洞,并提供解决方案。最后,我们将强调培养良好的安全意识对于维护个人和组织的信息安全的重要性。