【Flink】(十三)Flink CEP Library 使用案例分析

简介: 【Flink】(十三)Flink CEP Library 使用案例分析

文章目录


一、前言

二、CEPTest

三、Alert

四、MonitoringEvent

五、TemperatureEvent


一、前言


根据Flink CEP library来监控数据中心中每个机柜的温度。当在一定的时间内,如果有2个连续的Event中的温度超过设置的阈值时,就产生一条警告;一条警告也许还不是很坏的结果,但是如果我们在同一个机柜上连续看到2条这种警告,这种情况比较严重了。所以根据第一个警告流的输出,通过定义另一个Pattern,以上一步的输出作为第二个pattern的输入,来定义一个“严重”的问题。


二、CEPTest


package cep;


import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
import java.util.Map;
public class CEPTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // DataStream : source
        DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),
                new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),
                new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),
                new TemperatureEvent("xyz", 27.0), new TemperatureEvent("xyz", 30.0));
        // 定义Pattern,检查10秒钟内温度是否高于26度
        Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
                .subtype(TemperatureEvent.class)
                .where(new SimpleCondition<TemperatureEvent>() {
                    public boolean filter(TemperatureEvent subEvent) {
                        if (subEvent.getTemperature() >= 26.0) {
                            return true;
                        }
                        return false;
                    }
                })
                .within(Time.seconds(10));
        //匹配pattern并select事件,符合条件的发生警告,即输出
        DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
                .select(
                        new PatternSelectFunction<TemperatureEvent, Alert>() {
                            @Override
                            public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
                                return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
                            }
                        });
        patternStream.print();
        env.execute("CEP on Temperature Sensor");
    }
}

三、Alert


package cep;


public class Alert {
    private String message;
    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }
    public Alert(String message) {
        this.message = message;
    }
    @Override
    public String toString() {
        return "Alert [message=" + message + "]";
    }
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((message == null) ? 0 : message.hashCode());
        return result;
    }
    @Override
    public boolean equals(Object obj) {
        if(this == obj) return true;
        if(obj == null) return false;
        if(getClass() != obj.getClass()) return false;
        Alert other = (Alert) obj;
        if(message == null) {
            if(other.message != null) {
                return false;
            }else if(!message.equals(other.message)) {
                return false;
            }
        }
        return true;
    }
}


四、MonitoringEvent


package cep;


public abstract class MonitoringEvent {
    private String machineName;
    public String getMachineName() {
        return machineName;
    }
    public void setMachineName(String machineName) {
        this.machineName = machineName;
    }
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());
        return result;
    }
    @Override
    public boolean equals(Object obj) {
        if(this == obj) return true;
        if(obj == null) return false;
        if(getClass() != obj.getClass()) return false;
        MonitoringEvent other = (MonitoringEvent) obj;
        if(machineName == null) {
            if(other.machineName != null) {
                return false;
            }else if(!machineName.equals(other.machineName)) {
                return false;
            }
        }
        return true;
    }
    public MonitoringEvent(String machineName) {
        super();
        this.machineName = machineName;
    }
}


五、TemperatureEvent


package cep;


public class TemperatureEvent extends MonitoringEvent{
    public TemperatureEvent(String machineName) {
        super(machineName);
    }
    private double temperature;
    public double getTemperature() {
        return temperature;
    }
    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = super.hashCode();
        long temp;
        temp = Double.doubleToLongBits(temperature);
        result = (int) (prime * result +(temp ^ (temp >>> 32)));
        return result;
    }
    @Override
    public boolean equals(Object obj) {
        if(this == obj) return true;
        if(!super.equals(obj)) return false;
        if(getClass() != obj.getClass()) return false;
        TemperatureEvent other = (TemperatureEvent) obj;
        if(Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature)) return false;
        return true;
    }
    @Override
    public String toString() {
        return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName=" + getTemperature() + "]";
    }
    public TemperatureEvent(String machineName, double temperature) {
        super(machineName);
        this.temperature = temperature;
    }
}


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
632 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
9月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
283 11
|
11月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
514 5
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1677 3
探索Flink动态CEP:杭州银行的实战案例
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
1625 27
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
1186 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
317 0
|
6月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
649 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄