【Flink】(十三)Flink CEP Library 使用案例分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(十三)Flink CEP Library 使用案例分析

文章目录


一、前言

二、CEPTest

三、Alert

四、MonitoringEvent

五、TemperatureEvent


一、前言


根据Flink CEP library来监控数据中心中每个机柜的温度。当在一定的时间内,如果有2个连续的Event中的温度超过设置的阈值时,就产生一条警告;一条警告也许还不是很坏的结果,但是如果我们在同一个机柜上连续看到2条这种警告,这种情况比较严重了。所以根据第一个警告流的输出,通过定义另一个Pattern,以上一步的输出作为第二个pattern的输入,来定义一个“严重”的问题。


二、CEPTest


package cep;


import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.List;
import java.util.Map;
public class CEPTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // DataStream : source
        DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),
                new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),
                new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),
                new TemperatureEvent("xyz", 27.0), new TemperatureEvent("xyz", 30.0));
        // 定义Pattern,检查10秒钟内温度是否高于26度
        Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
                .subtype(TemperatureEvent.class)
                .where(new SimpleCondition<TemperatureEvent>() {
                    public boolean filter(TemperatureEvent subEvent) {
                        if (subEvent.getTemperature() >= 26.0) {
                            return true;
                        }
                        return false;
                    }
                })
                .within(Time.seconds(10));
        //匹配pattern并select事件,符合条件的发生警告,即输出
        DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
                .select(
                        new PatternSelectFunction<TemperatureEvent, Alert>() {
                            @Override
                            public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
                                return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
                            }
                        });
        patternStream.print();
        env.execute("CEP on Temperature Sensor");
    }
}

三、Alert


package cep;


public class Alert {
    private String message;
    public String getMessage() {
        return message;
    }
    public void setMessage(String message) {
        this.message = message;
    }
    public Alert(String message) {
        this.message = message;
    }
    @Override
    public String toString() {
        return "Alert [message=" + message + "]";
    }
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((message == null) ? 0 : message.hashCode());
        return result;
    }
    @Override
    public boolean equals(Object obj) {
        if(this == obj) return true;
        if(obj == null) return false;
        if(getClass() != obj.getClass()) return false;
        Alert other = (Alert) obj;
        if(message == null) {
            if(other.message != null) {
                return false;
            }else if(!message.equals(other.message)) {
                return false;
            }
        }
        return true;
    }
}


四、MonitoringEvent


package cep;


public abstract class MonitoringEvent {
    private String machineName;
    public String getMachineName() {
        return machineName;
    }
    public void setMachineName(String machineName) {
        this.machineName = machineName;
    }
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((machineName == null) ? 0 : machineName.hashCode());
        return result;
    }
    @Override
    public boolean equals(Object obj) {
        if(this == obj) return true;
        if(obj == null) return false;
        if(getClass() != obj.getClass()) return false;
        MonitoringEvent other = (MonitoringEvent) obj;
        if(machineName == null) {
            if(other.machineName != null) {
                return false;
            }else if(!machineName.equals(other.machineName)) {
                return false;
            }
        }
        return true;
    }
    public MonitoringEvent(String machineName) {
        super();
        this.machineName = machineName;
    }
}


五、TemperatureEvent


package cep;


public class TemperatureEvent extends MonitoringEvent{
    public TemperatureEvent(String machineName) {
        super(machineName);
    }
    private double temperature;
    public double getTemperature() {
        return temperature;
    }
    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = super.hashCode();
        long temp;
        temp = Double.doubleToLongBits(temperature);
        result = (int) (prime * result +(temp ^ (temp >>> 32)));
        return result;
    }
    @Override
    public boolean equals(Object obj) {
        if(this == obj) return true;
        if(!super.equals(obj)) return false;
        if(getClass() != obj.getClass()) return false;
        TemperatureEvent other = (TemperatureEvent) obj;
        if(Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature)) return false;
        return true;
    }
    @Override
    public String toString() {
        return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName=" + getTemperature() + "]";
    }
    public TemperatureEvent(String machineName, double temperature) {
        super(machineName);
        this.temperature = temperature;
    }
}


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3天前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
1月前
|
SQL 缓存 分布式计算
flink1.18 SqlGateway 的使用和原理分析
# 了解flink1.18 sqlGateway 的安装和使用步骤 # 启动sqlgateway 流程,了解核心的结构 # sql提交流程,了解sql 的流转逻辑 # select 查询的ResultSet的对接流程,了解数据的返回和获取逻辑
|
1月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
34 3
|
2月前
|
SQL 消息中间件 Apache
flink问题之cep超时事件如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
39 1
|
2月前
|
消息中间件 Kafka API
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
【极数系列】ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter & 详细分析解决
|
3月前
|
存储 NoSQL MongoDB
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
本文整理自阿里云 Flink 团队归源老师关于阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference 的研究。
46940 2
阿里云 Flink 原理分析与应用:深入探索 MongoDB Schema Inference
|
4月前
|
消息中间件 监控 Java
一次线上Flink 背压情况分析之重新认识java dump 文件
一次线上Flink 背压情况分析之重新认识java dump 文件
51 0
|
4月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
44 0
|
4月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
74 0
|
4月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
63 0