实时即未来,车联网项目之电子围栏分析【六】

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect

电子围栏简介和应用场景


  • 电子围栏简介和意义


地理围栏是一个虚拟的空间围栏,可以帮助开发者检测人或物何时进入或离开预定义区域,并支持实时报警功能。


  • 电子围栏的应用场景


1.签到打卡类场景

2.共享单车类场景

3.线下门店促销场景


  • 创建电子围栏


  • 在此项目中,使用的电子围栏是规则的圆形,判断是否在圆形电子围栏区域内,可以使用车辆位置和中心点球面距离小于等于半径,在电子围栏的区域内。
  • 还有一些不规则的电子围栏,这些可以使用射线取点的个数来判断是否在电子围栏内,如果是偶数在电子围栏外,否则是电子围栏内。


电子围栏规则和分析结果数据结构


  • 电子围栏的定义


459f6b68eb5ef6c7318e89ae79d104ac.png



  • 电子围栏规则数据结构


0c1cc99ca773d01a3d5fa2d9c359e108.png


  • 字段


  • 数据样本示例


1bc6b3f9e5afeaadd80b4bd2de34add3.png


  • 电子围栏分析结果数据结构


f927d757306244ba6ed1278944f4eb75.png


  • 字段


电子围栏分析步骤


02995a0e2db363ace6a2b8a6630303b6.png


  • 电子围栏任务8大步骤


1.电子围栏分析任务设置、原始数据json解析、过滤异常数据


2.读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)


3.原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream)


4.创建90秒翻滚窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)


5.读取电子围栏分析结果表数据并广播


6.翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect


7.对电子围栏对象模型,添加uuid和inMySQL(车辆是否已存在mysql表中)


8.电子围栏分析结果数据落地mysql,也可以选择落地mongo


电子围栏分析任务实现


  • 电子栅栏分析的逻辑图


  • 电子围栏分析主类:ElectricFenceTask


  • 简化 ItcastDataObj 对象:ItcastDataPartObj.java


  • 简化解析 ItcastParseUtil 对象: JsonParsePartUtil.java


  • 测试工具类对象


//实现步骤:
1)初始化flink流处理的运行环境(设置按照事件时间处理数据、设置hadoopHome的用户名、设置checkpoint)
2)读取kafka数据源(调用父类的方法)
3)将字符串转换成javaBean(ItcastDataPartObj)对象
4)过滤出来正常数据
5)读取电子围栏规则数据以及电子围栏规则关联的车辆数据并进行广播
6)将原始数据(消费的kafka数据)与电子围栏规则数据进行关联操作(Connect)并flatMap为 ElectricFenceRulesFuntion
7)对上步数据分配水印(30s)并根据 vin 分组后应用90s滚动窗口,然后对窗口进行自定义函数的开发(计算出来该窗口的数据属于电子围栏外还是电子围栏内)
8)读取电子围栏分析结果表的数据并进行广播
9)对第七步和第八步产生的数据进行关联操作(connect)
10)对第九步的结果进行滚动窗口操作,应用自定义窗口函数(实现添加uuid和inMysql属性赋值)
11)将分析后的电子围栏结果数据实时写入到mysql数据库中
12)运行作业,等待停止


广播状态与实现



  • 回顾广播变量概念


广播变量就是将变量广播到各个 taskmanager的内存中,可以共享数据,一般情况下广播变量的类型是 map 类型 key->value


  • 广播变量的数据格式是——map类型state


  • 如何使用广播变量


HashMap ,其中String:vin


  • 电子围栏转换临时对象——ElectricFenceResultTmp


@Data
@AllArgsConstructor
public class ElectricFenceResultTmp {
    //电子围栏id
    private int id;
    //电子围栏名称
    private String name;
    //电子围栏中心地址
    private String address;
    //电子围栏半径
    private float radius;
    //电子围栏中心点的经度
    private double longitude;
    //电子围栏中心点的维度
    private double latitude;
    //电子围栏的开始时间
    private Date startTime;
    //电子围栏的结束时间
    private Date endTime;
    @Override
    public String toString() {
        return "ElectricFenceResultTmp{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", address='" + address + '\'' +
                ", radius=" + radius +
                ", longitude=" + longitude +
                ", latitude=" + latitude +
                ", startTime=" + startTime +
                ", endTime=" + endTime +
                '}';
    }
}


  • 自定义 source 读取 MySQL 的数据源并广播


。定义读取电子围栏规则类——MysqlElectricFenceSouce


返回类型为 HashMap


//读取mysql存储的电子围栏规则表数据以及电子围栏规则关联的电子围栏规则车辆表数据,根据分析,一个车辆可能适配多个电子围栏规则,所以返回的数据类型定义为HashMap<vin, 电子围栏规则对象>,为了方便处理,我们只处理一个车辆关联一个电子围栏规则的场景(真事的业务开发中一定是一个车辆可能有很多很多对应电子围栏规则的)。
//继承 RichSourceFunction<HashMap<String, ElectricFenceResultTmp>>
//1.重写 open 方法
//1.1 获取上下文中的 parameterTool
//1.2 读取配置文件中,注册驱动 url user password
//1.3 实例化statement
//2.重写 close 方法
//2.1 关闭 statement 和 conn
//3.重写 run 方法
//3.1 每指定时间循环读取 mysql 中的电子围栏规则
//3.2 收集 electricFenceResult 指定休眠时间
//4.重写 cancel 方法


  • 读取数据库中配置信息


select vins.vin,setting.id,setting.name,setting.address,setting.radius,setting.longitude,setting.latitude,setting.start_time,setting.end_time 
from vehicle_networking.electronic_fence_setting setting 
inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id 
where setting.status=1


电子围栏中的 ConnectStreamed应用


connect流说明


connect流使用场景


两点之间球面距离的计算——DistanceCaculateUtil


  • 导入工具jar包坐标


<!-- geodesy地址位置查询依赖 -->
<dependency>
    <groupId>org.gavaghan</groupId>
    <artifactId>geodesy</artifactId>
    <version>${geodesy.version}</version>
</dependency>


  • 两点之间球面距离的计算工具类


/**
 * TODO 球面距离计算工具类;根据两个点的经纬度,计算出距离
 */
public class DistanceCaculateUtil {
    /**
     * @desc:计算地址位置方法,坐标系、经纬度用于计算距离(直线距离)
     * @param gpsFrom
     * @param gpsTo
     * @param ellipsoid
     * @return 计算距离
     */
    private static Double getDistanceMeter(GlobalCoordinates gpsFrom, GlobalCoordinates gpsTo, Ellipsoid ellipsoid) {
        //
        GeodeticCurve geodeticCurve = new GeodeticCalculator().calculateGeodeticCurve(ellipsoid, gpsFrom, gpsTo);
        return geodeticCurve.getEllipsoidalDistance();
    }
    /**
     * @desc:使用传入的ellipsoidsphere方法计算距离
     * @param latitude 位置1经度
     * @param longitude 位置1维度
     * @param latitude2 位置2经度
     * @param longitude2 位置2维度
     * @param ellipsoid 椭圆计算算法
     * @return
     */
    private static Double ellipsoidMethodDistance(Double latitude, Double longitude, Double latitude2, Double longitude2, Ellipsoid ellipsoid){
        // todo 位置点经度、维度不为空 位置点2经度、维度不为空 椭圆算法
        Objects.requireNonNull(latitude, "latitude is not null");
        Objects.requireNonNull(longitude, "longitude is not null");
        Objects.requireNonNull(latitude2, "latitude2 is not null");
        Objects.requireNonNull(longitude2, "longitude2 is not null");
        Objects.requireNonNull(ellipsoid, "ellipsoid method is not null");
        // todo 地球坐标对象:封装经度维度坐标对象
        GlobalCoordinates source = new GlobalCoordinates(latitude, longitude);
        GlobalCoordinates target = new GlobalCoordinates(latitude2, longitude2);
        // todo 椭圆范围计算方法
        return getDistanceMeter(source, target, ellipsoid);
    }
    /**
     * @desc:使用ellipsoidsphere方法计算距离
     * @param latitude
     * @param longitude
     * @param latitude2
     * @param longitude2
     * @return distance 单位:m
     */
    public static Double getDistance(Double latitude,Double longitude,Double latitude2,Double longitude2) {
        // 椭圆范围计算方法:Ellipsoid.Sphere
        return ellipsoidMethodDistance(latitude, longitude, latitude2, longitude2, Ellipsoid.Sphere);
    }
}


电子围栏中自定义对象将两个数据流合并


  • 通过关联两个数据流后CoFlatMap 后生成实体类—— ElectricFenceModel


/**
 * 电子围栏规则计算模型
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElectricFenceModel implements Comparable<ElectricFenceModel> {
    //车架号
    private String vin = "";
    //电子围栏结果表UUID
    private Long uuid = -999999L;
    //上次状态 0 里面 1 外面
    private int lastStatus = -999999;
    //当前状态 0  里面 1 外面
    private int nowStatus = -999999;
    //位置时间 yyyy-MM-dd HH:mm:ss
    private String gpsTime = "";
    //位置纬度--
    private Double lat = -999999D;
    //位置经度--
    private Double lng = -999999D;
    //电子围栏ID
    private int eleId = -999999;
    //电子围栏名称
    private String eleName = "";
    //中心点地址
    private String address = "";
    //中心点纬度
    private Double latitude;
    //中心点经度
    private Double longitude = -999999D;
    //电子围栏半径
    private Float radius = -999999F;
    //出围栏时间
    private String outEleTime = null;
    //进围栏时间
    private String inEleTime = null;
    //是否在mysql结果表中
    private Boolean inMysql = false;
    //状态报警 0:出围栏 1:进围栏
    private int statusAlarm = -999999;
    //报警信息
    private String statusAlarmMsg = "";
    //终端时间
    private String terminalTime = "";
    // 扩展字段 终端时间
    private Long terminalTimestamp = -999999L;
    @Override
    public int compareTo(ElectricFenceModel o) {
        if(this.getTerminalTimestamp() > o.getTerminalTimestamp()){
            return  1;
        }
        else if(this.getTerminalTimestamp() < o.getTerminalTimestamp()){
            return  -1;
        }else{
            return 0;
        }
    }
}


  • 实现将两个流合并CoFlatMapFunction接口—— ElectricFenceRulesFuntion


//1.定义返回的 ElectricFenceModel
        //2.判断如果流数据数据质量(车辆的经纬度不能为0或-999999,车辆GpsTime不能为空)
        //2.1.获取当前车辆的 vin
        //2.2.通过vin获取电子围栏的配置信息
        //2.3.如果电子围栏配置信息不为空
        //2.3.1.说明当前车辆关联了电子围栏规则,需要判断当前上报的数据是否在电子围栏规则的生效时间内,先获取上报地理位置时间gpsTimestamp
        //2.3.2.如果当前gpsTimestamp>=开始时间戳并且gpsTimestamp<=结束时间戳,以下内容存入到 ElectricFenceModel
        //2.3.2.1.上报车辆的数据在电子围栏生效期内 vin gpstime lng lat 终端时间和终端时间戳
        //2.3.2.2.电子围栏id,电子围栏名称,地址,半径
        //2.3.2.3.电子围栏经纬度
        //2.3.2.4.计算经纬度和电子围栏经纬度距离距离,如果两点之间大于半径(单位是千米)的距离,就是存在于圆外,否则反之
        //2.3.2.5.收集结果数据


设置窗口并计算确定是否在电子围栏内告警


  • 设置水印机制


  • 根据 vin 进行分组


  • 创建 90 秒翻滚窗口


  • 自定义电子围栏窗口实现类:ElectricFenceWindowFunction


//对电子围栏进行自定义窗口操作,处理电子围栏判断逻辑
//继承 RichWindowFunction<ElectricFenceModel, ElectricFenceModel, String, TimeWindow>
//1.定义存储历史电子围栏数据的state,<vin,是否在电子围栏内0:内,1:外> MapState<String, Integer>
//2.重写open方法
//2.1 定义mapState的描述器(相当于表结构) <String,Integer>
//2.2 获取 parameterTool,用来读取配置文件参数
//2.3 读取状态的超时时间 "vehicle.state.last.period" ,构建ttl设置更新类型和状态可见
//2.4 设置状态描述 StateTtlConfig,开启生命周期时间
//2.5 获取map状态


  • apply 方法步骤如下


    //1.创建返回对象
        //2.对窗口内的数据进行排序
        //3.从 state 中获取车辆vin对应的上一次窗口电子围栏lastStateValue标记(车辆上一次窗口是否在电子围栏中)0:电子围栏内 1:电子围栏外
        //4.如果上次状态为空,初始化赋值
        //5.判断当前处于电子围栏内还是电子围栏外
        //5.1.定义当前车辆电子围栏内出现的次数
        //5.2.定义当前车辆电子围栏外出现的次数
        //6.定义当前窗口的电子围栏状态
        //7. 90s内车辆出现在电子围栏内的次数多于出现在电子围栏外的次数,则认为当前处于电子围栏内
        //8. 将当前窗口的电子围栏状态写入到 state 中,供下次判断
        //9.如果当前电子围栏状态与上一次电子围栏状态不同
        //9.1.如果上一次窗口处于电子围栏外,而本次是电子围栏内,则将进入电子围栏的时间写入到数据库中
        //9.1.1.过滤出来状态为0的第一条数据
        //9.1.2.拷贝属性给 electricFenceModel 并将进入终端时间赋值,并且将状态告警字段赋值为1 0:出围栏 1:进围栏,将数据collect返回
        //9.2.如果上一次窗口处于电子围栏内,而本次是电子围栏外,则将出电子围栏的时间写入到数据库中
        //9.2.1.过滤出来状态倒序为1的第一条数据
        //9.2.2.拷贝属性给 electricFenceModel 并将出终端时间赋值,并且将状态告警 0:出围栏 1:进围栏,将数据collect返回


  • 如果判断为进入到电子围栏,进入到电子围栏的第一条数据的时间会被记录下来


合并分析电子围栏结果


读取电子围栏分析结果并广播


  • 读取mysql的电子围栏结果表的数据——MysqlElectricFenceResultSource


//读取电子围栏分析结果表的数据,并进行广播
//继承自 RichSourceFunction<HashMap<String, Long>>
//1.重写 open 方法,初始化连接
//1.1 编写sql "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin;"
//2.重写 close 方法
//3.重写 run 方法 获取出来vin 和 id 封装成map并返回
//4.重写 cancel 方法


  • 读取电子栅栏的 vin 和 最近id


select vin,min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null group by vin


  • 将读取的电子栅栏信息数据流广播出去


窗口流数据与广播流数据连接


  • 将电子栅栏模型数据流和电子栅栏 获取的流进行关联,并进行 flatMap


  • 实现电子围栏分析结果模型添加 uuid 和 inMysql 字段 —— ElectricFenceModelFunction


//实现 CoFlatMapFunction<ElectricFenceModel, HashMap<String, Long>, ElectricFenceModel>
//1.重写flatMap1方法
//1.1.通过getvin获取配置流中是否存在值
//2.如果不为 null
//2.1.设置为当前时间戳
//2.2.设置库中InMysql是否存在为 true
//3.否则
//3.1.设置 uuid 为最大值-当前时间戳
//3.2 设置库中是否存在为 false
//4.收集数据
//5.重写 flatMap2 方法
//5.1.读取配置数据


电子围栏分析结果入库


  • 将电子围栏分析结果数据写入到 mysql 数据库中 —— ElectricFenceMysqlSink


//继承于 RichSinkFunction<ElectricFenceModel>
//1. 重写 open 方法,获取参数,创建连接
//2. 重写 invoke 方法,
//2.1 出围栏(且能获取到进围栏状态的)则修改进围栏的状态, 否则 进入围栏,转换ElectricFenceModel对象,插入结构数据到电子围栏结果表
//3. 重写 close 方法


测试电子围栏

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
目录
相关文章
|
JavaScript Java 关系型数据库
Springboot+vue的应急救援物资管理系统,Javaee项目,springboot vue前后端分离项目。
Springboot+vue的应急救援物资管理系统,Javaee项目,springboot vue前后端分离项目。
|
Kubernetes 监控 Cloud Native
Kubernetes集群的高可用性与伸缩性实践
Kubernetes集群的高可用性与伸缩性实践
272 1
|
SQL 存储 关系型数据库
如何巧用索引优化SQL语句性能?
本文从索引角度探讨了如何优化MySQL中的SQL语句性能。首先介绍了如何通过查看执行时间和执行计划定位慢SQL,并详细解析了EXPLAIN命令的各个字段含义。接着讲解了索引优化的关键点,包括聚簇索引、索引覆盖、联合索引及最左前缀原则等。最后,通过具体示例展示了索引如何提升查询速度,并提供了三层B+树的存储容量计算方法。通过这些技巧,可以帮助开发者有效提升数据库查询效率。
1069 2
|
Ubuntu 应用服务中间件 网络安全
Ubuntu 22.04环境下为Odoo开启80端口的方法
通过以上步骤,你应该能够在Ubuntu 22.04环境下为Odoo开启80端口。访问你的域名时,Nginx会将请求代理到Odoo,允许你通过80端口访问Odoo应用。
488 1
|
大数据 Python
【Python DataFrame专栏】DataFrame内存管理与优化:大型数据集处理技巧
【5月更文挑战第20天】本文介绍了使用Python的pandas库优化DataFrame内存管理的六个技巧:1) 查看DataFrame内存占用;2) 使用高效数据类型,如`category`和`int32`;3) 仅读取需要的列;4) 分块处理大数据集;5) 利用`inplace`参数节省内存;6) 使用`eval()`和`query()`进行快速筛选。这些方法有助于处理大型数据集时提高效率。
630 3
【Python DataFrame专栏】DataFrame内存管理与优化:大型数据集处理技巧
|
JavaScript 安全 开发工具
在 Vue 3 中使用 TypeScript
【10月更文挑战第3天】
|
数据采集 存储 数据管理
cdga|数据治理策略:击破壁垒,迈向纵向一体化的新纪元
企业将逐步击破数据壁垒,实现数据的纵向一体化。这意味着企业能够更高效地整合内外部数据资源,形成全面、准确、及时的数据视图,为管理层提供有力的决策支持。同时,数据的一体化也将促进业务流程的优化和创新,推动企业向智能化、数字化转型迈进。
cdga|数据治理策略:击破壁垒,迈向纵向一体化的新纪元
|
前端开发 API
(WEB前端编辑DWG)在线CAD如何实现图形识别功能
mxcad 提供的图形识别功能可帮助用户快速识别和提取 CAD 图纸中的各种图形,如直线、多段线、弧线、圆及图块,显著提升设计效率。此功能不仅适用于图形分类,还能进行数量统计和快速定位,减少手动操作。用户可通过 API 进行二次开发,自定义识别逻辑。具体步骤包括打开在线示例、选择识别功能、设置识别参数并开始识别。更多开发文档请关注公众号:梦想云图网页 CAD。
lxml.etree.XPathEvalError: Invalid expression
lxml.etree.XPathEvalError: Invalid expression
183 4
|
自然语言处理 算法
NLP之距离算法Levenshtein
NLP之距离算法Levenshtein