实时即未来,车联网项目之电子围栏分析【六】

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect

电子围栏简介和应用场景


  • 电子围栏简介和意义


地理围栏是一个虚拟的空间围栏,可以帮助开发者检测人或物何时进入或离开预定义区域,并支持实时报警功能。


  • 电子围栏的应用场景


1.签到打卡类场景

2.共享单车类场景

3.线下门店促销场景


  • 创建电子围栏


  • 在此项目中,使用的电子围栏是规则的圆形,判断是否在圆形电子围栏区域内,可以使用车辆位置和中心点球面距离小于等于半径,在电子围栏的区域内。
  • 还有一些不规则的电子围栏,这些可以使用射线取点的个数来判断是否在电子围栏内,如果是偶数在电子围栏外,否则是电子围栏内。


电子围栏规则和分析结果数据结构


  • 电子围栏的定义


459f6b68eb5ef6c7318e89ae79d104ac.png



  • 电子围栏规则数据结构


0c1cc99ca773d01a3d5fa2d9c359e108.png


  • 字段


  • 数据样本示例


1bc6b3f9e5afeaadd80b4bd2de34add3.png


  • 电子围栏分析结果数据结构


f927d757306244ba6ed1278944f4eb75.png


  • 字段


电子围栏分析步骤


02995a0e2db363ace6a2b8a6630303b6.png


  • 电子围栏任务8大步骤


1.电子围栏分析任务设置、原始数据json解析、过滤异常数据


2.读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)


3.原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream)


4.创建90秒翻滚窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)


5.读取电子围栏分析结果表数据并广播


6.翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect


7.对电子围栏对象模型,添加uuid和inMySQL(车辆是否已存在mysql表中)


8.电子围栏分析结果数据落地mysql,也可以选择落地mongo


电子围栏分析任务实现


  • 电子栅栏分析的逻辑图


  • 电子围栏分析主类:ElectricFenceTask


  • 简化 ItcastDataObj 对象:ItcastDataPartObj.java


  • 简化解析 ItcastParseUtil 对象: JsonParsePartUtil.java


  • 测试工具类对象


//实现步骤:
1)初始化flink流处理的运行环境(设置按照事件时间处理数据、设置hadoopHome的用户名、设置checkpoint)
2)读取kafka数据源(调用父类的方法)
3)将字符串转换成javaBean(ItcastDataPartObj)对象
4)过滤出来正常数据
5)读取电子围栏规则数据以及电子围栏规则关联的车辆数据并进行广播
6)将原始数据(消费的kafka数据)与电子围栏规则数据进行关联操作(Connect)并flatMap为 ElectricFenceRulesFuntion
7)对上步数据分配水印(30s)并根据 vin 分组后应用90s滚动窗口,然后对窗口进行自定义函数的开发(计算出来该窗口的数据属于电子围栏外还是电子围栏内)
8)读取电子围栏分析结果表的数据并进行广播
9)对第七步和第八步产生的数据进行关联操作(connect)
10)对第九步的结果进行滚动窗口操作,应用自定义窗口函数(实现添加uuid和inMysql属性赋值)
11)将分析后的电子围栏结果数据实时写入到mysql数据库中
12)运行作业,等待停止


广播状态与实现



  • 回顾广播变量概念


广播变量就是将变量广播到各个 taskmanager的内存中,可以共享数据,一般情况下广播变量的类型是 map 类型 key->value


  • 广播变量的数据格式是——map类型state


  • 如何使用广播变量


HashMap ,其中String:vin


  • 电子围栏转换临时对象——ElectricFenceResultTmp


@Data
@AllArgsConstructor
public class ElectricFenceResultTmp {
    //电子围栏id
    private int id;
    //电子围栏名称
    private String name;
    //电子围栏中心地址
    private String address;
    //电子围栏半径
    private float radius;
    //电子围栏中心点的经度
    private double longitude;
    //电子围栏中心点的维度
    private double latitude;
    //电子围栏的开始时间
    private Date startTime;
    //电子围栏的结束时间
    private Date endTime;
    @Override
    public String toString() {
        return "ElectricFenceResultTmp{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", address='" + address + '\'' +
                ", radius=" + radius +
                ", longitude=" + longitude +
                ", latitude=" + latitude +
                ", startTime=" + startTime +
                ", endTime=" + endTime +
                '}';
    }
}


  • 自定义 source 读取 MySQL 的数据源并广播


。定义读取电子围栏规则类——MysqlElectricFenceSouce


返回类型为 HashMap


//读取mysql存储的电子围栏规则表数据以及电子围栏规则关联的电子围栏规则车辆表数据,根据分析,一个车辆可能适配多个电子围栏规则,所以返回的数据类型定义为HashMap<vin, 电子围栏规则对象>,为了方便处理,我们只处理一个车辆关联一个电子围栏规则的场景(真事的业务开发中一定是一个车辆可能有很多很多对应电子围栏规则的)。
//继承 RichSourceFunction<HashMap<String, ElectricFenceResultTmp>>
//1.重写 open 方法
//1.1 获取上下文中的 parameterTool
//1.2 读取配置文件中,注册驱动 url user password
//1.3 实例化statement
//2.重写 close 方法
//2.1 关闭 statement 和 conn
//3.重写 run 方法
//3.1 每指定时间循环读取 mysql 中的电子围栏规则
//3.2 收集 electricFenceResult 指定休眠时间
//4.重写 cancel 方法


  • 读取数据库中配置信息


select vins.vin,setting.id,setting.name,setting.address,setting.radius,setting.longitude,setting.latitude,setting.start_time,setting.end_time 
from vehicle_networking.electronic_fence_setting setting 
inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id 
where setting.status=1


电子围栏中的 ConnectStreamed应用


connect流说明


connect流使用场景


两点之间球面距离的计算——DistanceCaculateUtil


  • 导入工具jar包坐标


<!-- geodesy地址位置查询依赖 -->
<dependency>
    <groupId>org.gavaghan</groupId>
    <artifactId>geodesy</artifactId>
    <version>${geodesy.version}</version>
</dependency>


  • 两点之间球面距离的计算工具类


/**
 * TODO 球面距离计算工具类;根据两个点的经纬度,计算出距离
 */
public class DistanceCaculateUtil {
    /**
     * @desc:计算地址位置方法,坐标系、经纬度用于计算距离(直线距离)
     * @param gpsFrom
     * @param gpsTo
     * @param ellipsoid
     * @return 计算距离
     */
    private static Double getDistanceMeter(GlobalCoordinates gpsFrom, GlobalCoordinates gpsTo, Ellipsoid ellipsoid) {
        //
        GeodeticCurve geodeticCurve = new GeodeticCalculator().calculateGeodeticCurve(ellipsoid, gpsFrom, gpsTo);
        return geodeticCurve.getEllipsoidalDistance();
    }
    /**
     * @desc:使用传入的ellipsoidsphere方法计算距离
     * @param latitude 位置1经度
     * @param longitude 位置1维度
     * @param latitude2 位置2经度
     * @param longitude2 位置2维度
     * @param ellipsoid 椭圆计算算法
     * @return
     */
    private static Double ellipsoidMethodDistance(Double latitude, Double longitude, Double latitude2, Double longitude2, Ellipsoid ellipsoid){
        // todo 位置点经度、维度不为空 位置点2经度、维度不为空 椭圆算法
        Objects.requireNonNull(latitude, "latitude is not null");
        Objects.requireNonNull(longitude, "longitude is not null");
        Objects.requireNonNull(latitude2, "latitude2 is not null");
        Objects.requireNonNull(longitude2, "longitude2 is not null");
        Objects.requireNonNull(ellipsoid, "ellipsoid method is not null");
        // todo 地球坐标对象:封装经度维度坐标对象
        GlobalCoordinates source = new GlobalCoordinates(latitude, longitude);
        GlobalCoordinates target = new GlobalCoordinates(latitude2, longitude2);
        // todo 椭圆范围计算方法
        return getDistanceMeter(source, target, ellipsoid);
    }
    /**
     * @desc:使用ellipsoidsphere方法计算距离
     * @param latitude
     * @param longitude
     * @param latitude2
     * @param longitude2
     * @return distance 单位:m
     */
    public static Double getDistance(Double latitude,Double longitude,Double latitude2,Double longitude2) {
        // 椭圆范围计算方法:Ellipsoid.Sphere
        return ellipsoidMethodDistance(latitude, longitude, latitude2, longitude2, Ellipsoid.Sphere);
    }
}


电子围栏中自定义对象将两个数据流合并


  • 通过关联两个数据流后CoFlatMap 后生成实体类—— ElectricFenceModel


/**
 * 电子围栏规则计算模型
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ElectricFenceModel implements Comparable<ElectricFenceModel> {
    //车架号
    private String vin = "";
    //电子围栏结果表UUID
    private Long uuid = -999999L;
    //上次状态 0 里面 1 外面
    private int lastStatus = -999999;
    //当前状态 0  里面 1 外面
    private int nowStatus = -999999;
    //位置时间 yyyy-MM-dd HH:mm:ss
    private String gpsTime = "";
    //位置纬度--
    private Double lat = -999999D;
    //位置经度--
    private Double lng = -999999D;
    //电子围栏ID
    private int eleId = -999999;
    //电子围栏名称
    private String eleName = "";
    //中心点地址
    private String address = "";
    //中心点纬度
    private Double latitude;
    //中心点经度
    private Double longitude = -999999D;
    //电子围栏半径
    private Float radius = -999999F;
    //出围栏时间
    private String outEleTime = null;
    //进围栏时间
    private String inEleTime = null;
    //是否在mysql结果表中
    private Boolean inMysql = false;
    //状态报警 0:出围栏 1:进围栏
    private int statusAlarm = -999999;
    //报警信息
    private String statusAlarmMsg = "";
    //终端时间
    private String terminalTime = "";
    // 扩展字段 终端时间
    private Long terminalTimestamp = -999999L;
    @Override
    public int compareTo(ElectricFenceModel o) {
        if(this.getTerminalTimestamp() > o.getTerminalTimestamp()){
            return  1;
        }
        else if(this.getTerminalTimestamp() < o.getTerminalTimestamp()){
            return  -1;
        }else{
            return 0;
        }
    }
}


  • 实现将两个流合并CoFlatMapFunction接口—— ElectricFenceRulesFuntion


//1.定义返回的 ElectricFenceModel
        //2.判断如果流数据数据质量(车辆的经纬度不能为0或-999999,车辆GpsTime不能为空)
        //2.1.获取当前车辆的 vin
        //2.2.通过vin获取电子围栏的配置信息
        //2.3.如果电子围栏配置信息不为空
        //2.3.1.说明当前车辆关联了电子围栏规则,需要判断当前上报的数据是否在电子围栏规则的生效时间内,先获取上报地理位置时间gpsTimestamp
        //2.3.2.如果当前gpsTimestamp>=开始时间戳并且gpsTimestamp<=结束时间戳,以下内容存入到 ElectricFenceModel
        //2.3.2.1.上报车辆的数据在电子围栏生效期内 vin gpstime lng lat 终端时间和终端时间戳
        //2.3.2.2.电子围栏id,电子围栏名称,地址,半径
        //2.3.2.3.电子围栏经纬度
        //2.3.2.4.计算经纬度和电子围栏经纬度距离距离,如果两点之间大于半径(单位是千米)的距离,就是存在于圆外,否则反之
        //2.3.2.5.收集结果数据


设置窗口并计算确定是否在电子围栏内告警


  • 设置水印机制


  • 根据 vin 进行分组


  • 创建 90 秒翻滚窗口


  • 自定义电子围栏窗口实现类:ElectricFenceWindowFunction


//对电子围栏进行自定义窗口操作,处理电子围栏判断逻辑
//继承 RichWindowFunction<ElectricFenceModel, ElectricFenceModel, String, TimeWindow>
//1.定义存储历史电子围栏数据的state,<vin,是否在电子围栏内0:内,1:外> MapState<String, Integer>
//2.重写open方法
//2.1 定义mapState的描述器(相当于表结构) <String,Integer>
//2.2 获取 parameterTool,用来读取配置文件参数
//2.3 读取状态的超时时间 "vehicle.state.last.period" ,构建ttl设置更新类型和状态可见
//2.4 设置状态描述 StateTtlConfig,开启生命周期时间
//2.5 获取map状态


  • apply 方法步骤如下


    //1.创建返回对象
        //2.对窗口内的数据进行排序
        //3.从 state 中获取车辆vin对应的上一次窗口电子围栏lastStateValue标记(车辆上一次窗口是否在电子围栏中)0:电子围栏内 1:电子围栏外
        //4.如果上次状态为空,初始化赋值
        //5.判断当前处于电子围栏内还是电子围栏外
        //5.1.定义当前车辆电子围栏内出现的次数
        //5.2.定义当前车辆电子围栏外出现的次数
        //6.定义当前窗口的电子围栏状态
        //7. 90s内车辆出现在电子围栏内的次数多于出现在电子围栏外的次数,则认为当前处于电子围栏内
        //8. 将当前窗口的电子围栏状态写入到 state 中,供下次判断
        //9.如果当前电子围栏状态与上一次电子围栏状态不同
        //9.1.如果上一次窗口处于电子围栏外,而本次是电子围栏内,则将进入电子围栏的时间写入到数据库中
        //9.1.1.过滤出来状态为0的第一条数据
        //9.1.2.拷贝属性给 electricFenceModel 并将进入终端时间赋值,并且将状态告警字段赋值为1 0:出围栏 1:进围栏,将数据collect返回
        //9.2.如果上一次窗口处于电子围栏内,而本次是电子围栏外,则将出电子围栏的时间写入到数据库中
        //9.2.1.过滤出来状态倒序为1的第一条数据
        //9.2.2.拷贝属性给 electricFenceModel 并将出终端时间赋值,并且将状态告警 0:出围栏 1:进围栏,将数据collect返回


  • 如果判断为进入到电子围栏,进入到电子围栏的第一条数据的时间会被记录下来


合并分析电子围栏结果


读取电子围栏分析结果并广播


  • 读取mysql的电子围栏结果表的数据——MysqlElectricFenceResultSource


//读取电子围栏分析结果表的数据,并进行广播
//继承自 RichSourceFunction<HashMap<String, Long>>
//1.重写 open 方法,初始化连接
//1.1 编写sql "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin;"
//2.重写 close 方法
//3.重写 run 方法 获取出来vin 和 id 封装成map并返回
//4.重写 cancel 方法


  • 读取电子栅栏的 vin 和 最近id


select vin,min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null group by vin


  • 将读取的电子栅栏信息数据流广播出去


窗口流数据与广播流数据连接


  • 将电子栅栏模型数据流和电子栅栏 获取的流进行关联,并进行 flatMap


  • 实现电子围栏分析结果模型添加 uuid 和 inMysql 字段 —— ElectricFenceModelFunction


//实现 CoFlatMapFunction<ElectricFenceModel, HashMap<String, Long>, ElectricFenceModel>
//1.重写flatMap1方法
//1.1.通过getvin获取配置流中是否存在值
//2.如果不为 null
//2.1.设置为当前时间戳
//2.2.设置库中InMysql是否存在为 true
//3.否则
//3.1.设置 uuid 为最大值-当前时间戳
//3.2 设置库中是否存在为 false
//4.收集数据
//5.重写 flatMap2 方法
//5.1.读取配置数据


电子围栏分析结果入库


  • 将电子围栏分析结果数据写入到 mysql 数据库中 —— ElectricFenceMysqlSink


//继承于 RichSinkFunction<ElectricFenceModel>
//1. 重写 open 方法,获取参数,创建连接
//2. 重写 invoke 方法,
//2.1 出围栏(且能获取到进围栏状态的)则修改进围栏的状态, 否则 进入围栏,转换ElectricFenceModel对象,插入结构数据到电子围栏结果表
//3. 重写 close 方法


测试电子围栏

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
传感器 存储 监控
TDengine 签约北微传感,实现海量传感器数据的秒级响应
在当今物联网(IoT)快速发展的背景下,传感器技术已成为各个行业数字化转型的关键组成部分。随着设备数量的激增和数据生成速度的加快,如何高效地管理和分析这些数据,成为企业实现智能化运营的重要挑战。
50 0
|
4月前
|
自动驾驶
【Echarts大屏】自动驾驶车辆平台分享
【Echarts大屏】自动驾驶车辆平台分享
|
7月前
|
监控 数据可视化 安全
SaaS智慧工地云平台源码 视频监控、机械设备、环境监测、劳务分析、进度管理
自动监测塔机和施工升降机的运行情况,且在有危险源时及时发出报警和输出控制信号,并可全程记录特种设备的运行数据,将数据传输至云平台,为相应操作人员的管理提供数据支持。
119 1
|
7月前
|
传感器 机器学习/深度学习 监控
实时交通信息采集与处理
实时交通信息采集与处理
312 3
|
监控 数据可视化 智慧交通
1.2g可视化大屏项目分享【包含数字孪生、视频监控、智慧城市、智慧交通等】
1.2g可视化大屏项目分享【包含数字孪生、视频监控、智慧城市、智慧交通等】
1.2g可视化大屏项目分享【包含数字孪生、视频监控、智慧城市、智慧交通等】
|
监控 前端开发 定位技术
GIS跟踪监管系统电子围栏
GIS跟踪监管系统电子围栏
175 0
|
消息中间件 前端开发 Java
实时即未来,车联网项目之车辆驾驶行为分析【五】
单次行驶里程区间分布、单次行程消耗soc区间分布、最大里程分布、充电行程占比、平均行驶里程分布、周行驶里程分布、最大行驶里程分段统计、常用行驶里程、全国-每日平均行驶里程(近4周)、全国-单车日均行驶里程分布(近一年)、各车系单次最大行驶里程分布、不同里程范围内车辆占比情况。
359 0
实时即未来,车联网项目之车辆驾驶行为分析【五】
|
存储 NoSQL 关系型数据库
实时即未来,车联网项目之远程诊断实时故障分析【七】
geohash 就是将地图上位置(经纬度)转换成偶数位是经度、奇数数是维度,新的二进制字节,转换成字符串,用字符串代表某一个地理位置。
582 0
|
NoSQL 关系型数据库 MySQL
车联网场景下海量车辆状态数据存储实践
随着通信技术、计算机技术的不断发展,移动通信正在从人与人(H2H)向人与物(H2M)以及物与物(M2M)的方向发展,“万物互联”的概念正在逐步覆盖到各行各业中,例如智能家居、智能农业、智能交通、智能物流等领域。目前,车联网技术已经先行一步,在行车安全、交通管理、生活服务等方面得到充分应用。 车联网技术包括了车辆终端、云端、无线通信等方面。车辆终端实时产生大量车辆状态数
2067 0
车联网场景下海量车辆状态数据存储实践
|
传感器 数据采集 人工智能
面向自动驾驶的高精地图及数据采集生产体系
又到春招季!作为国民级出行服务平台,高德业务快速发展,大量校招/社招名额开放,欢迎大家投递简历,详情见文末。为帮助大家更了解高德技术,我们策划了#春招专栏#的系列文章,组织各业务团队的高年级同学以业务科普+技术应用实践为主要内容为大家做相关介绍。
599 0
面向自动驾驶的高精地图及数据采集生产体系
下一篇
DataWorks