电子围栏简介和应用场景
- 电子围栏简介和意义
地理围栏是一个虚拟的空间围栏,可以帮助开发者检测人或物何时进入或离开预定义区域,并支持实时报警功能。
- 电子围栏的应用场景
1.签到打卡类场景
2.共享单车类场景
3.线下门店促销场景
- 创建电子围栏
- 在此项目中,使用的电子围栏是规则的圆形,判断是否在圆形电子围栏区域内,可以使用车辆位置和中心点球面距离小于等于半径,在电子围栏的区域内。
- 还有一些不规则的电子围栏,这些可以使用射线取点的个数来判断是否在电子围栏内,如果是偶数在电子围栏外,否则是电子围栏内。
电子围栏规则和分析结果数据结构
- 电子围栏的定义
- 电子围栏规则数据结构
- 字段
- 数据样本示例
- 电子围栏分析结果数据结构
- 字段
电子围栏分析步骤
- 电子围栏任务8大步骤
1.电子围栏分析任务设置、原始数据json解析、过滤异常数据
2.读取已存在电子围栏中的车辆与电子围栏信息(广播流临时结果数据)
3.原始车辆数据与电子围栏广播流进行合并,生成电子围栏规则模型流数据(DStream)
4.创建90秒翻滚窗口,计算电子围栏信息(ElectricFenceModel中的值根据车辆是否在围栏内进行设置)
5.读取电子围栏分析结果表数据并广播
6.翻滚窗口电子围栏对象模型流数据与电子围栏分析结果数据广播流进行connect
7.对电子围栏对象模型,添加uuid和inMySQL(车辆是否已存在mysql表中)
8.电子围栏分析结果数据落地mysql,也可以选择落地mongo
电子围栏分析任务实现
- 电子栅栏分析的逻辑图
- 电子围栏分析主类:ElectricFenceTask
- 简化 ItcastDataObj 对象:ItcastDataPartObj.java
- 简化解析 ItcastParseUtil 对象: JsonParsePartUtil.java
- 测试工具类对象
//实现步骤: 1)初始化flink流处理的运行环境(设置按照事件时间处理数据、设置hadoopHome的用户名、设置checkpoint) 2)读取kafka数据源(调用父类的方法) 3)将字符串转换成javaBean(ItcastDataPartObj)对象 4)过滤出来正常数据 5)读取电子围栏规则数据以及电子围栏规则关联的车辆数据并进行广播 6)将原始数据(消费的kafka数据)与电子围栏规则数据进行关联操作(Connect)并flatMap为 ElectricFenceRulesFuntion 7)对上步数据分配水印(30s)并根据 vin 分组后应用90s滚动窗口,然后对窗口进行自定义函数的开发(计算出来该窗口的数据属于电子围栏外还是电子围栏内) 8)读取电子围栏分析结果表的数据并进行广播 9)对第七步和第八步产生的数据进行关联操作(connect) 10)对第九步的结果进行滚动窗口操作,应用自定义窗口函数(实现添加uuid和inMysql属性赋值) 11)将分析后的电子围栏结果数据实时写入到mysql数据库中 12)运行作业,等待停止
广播状态与实现
- 回顾广播变量概念
广播变量就是将变量广播到各个 taskmanager的内存中,可以共享数据,一般情况下广播变量的类型是 map 类型 key->value
- 广播变量的数据格式是——map类型state
- 如何使用广播变量
HashMap ,其中String:vin
- 电子围栏转换临时对象——ElectricFenceResultTmp
@Data @AllArgsConstructor public class ElectricFenceResultTmp { //电子围栏id private int id; //电子围栏名称 private String name; //电子围栏中心地址 private String address; //电子围栏半径 private float radius; //电子围栏中心点的经度 private double longitude; //电子围栏中心点的维度 private double latitude; //电子围栏的开始时间 private Date startTime; //电子围栏的结束时间 private Date endTime; @Override public String toString() { return "ElectricFenceResultTmp{" + "id=" + id + ", name='" + name + '\'' + ", address='" + address + '\'' + ", radius=" + radius + ", longitude=" + longitude + ", latitude=" + latitude + ", startTime=" + startTime + ", endTime=" + endTime + '}'; } }
- 自定义 source 读取 MySQL 的数据源并广播
。定义读取电子围栏规则类——MysqlElectricFenceSouce
返回类型为 HashMap
//读取mysql存储的电子围栏规则表数据以及电子围栏规则关联的电子围栏规则车辆表数据,根据分析,一个车辆可能适配多个电子围栏规则,所以返回的数据类型定义为HashMap<vin, 电子围栏规则对象>,为了方便处理,我们只处理一个车辆关联一个电子围栏规则的场景(真事的业务开发中一定是一个车辆可能有很多很多对应电子围栏规则的)。 //继承 RichSourceFunction<HashMap<String, ElectricFenceResultTmp>> //1.重写 open 方法 //1.1 获取上下文中的 parameterTool //1.2 读取配置文件中,注册驱动 url user password //1.3 实例化statement //2.重写 close 方法 //2.1 关闭 statement 和 conn //3.重写 run 方法 //3.1 每指定时间循环读取 mysql 中的电子围栏规则 //3.2 收集 electricFenceResult 指定休眠时间 //4.重写 cancel 方法
- 读取数据库中配置信息
select vins.vin,setting.id,setting.name,setting.address,setting.radius,setting.longitude,setting.latitude,setting.start_time,setting.end_time from vehicle_networking.electronic_fence_setting setting inner join vehicle_networking.electronic_fence_vins vins on setting.id=vins.setting_id where setting.status=1
电子围栏中的 ConnectStreamed应用
connect流说明
connect流使用场景
两点之间球面距离的计算——DistanceCaculateUtil
- 导入工具jar包坐标
<!-- geodesy地址位置查询依赖 --> <dependency> <groupId>org.gavaghan</groupId> <artifactId>geodesy</artifactId> <version>${geodesy.version}</version> </dependency>
- 两点之间球面距离的计算工具类
/** * TODO 球面距离计算工具类;根据两个点的经纬度,计算出距离 */ public class DistanceCaculateUtil { /** * @desc:计算地址位置方法,坐标系、经纬度用于计算距离(直线距离) * @param gpsFrom * @param gpsTo * @param ellipsoid * @return 计算距离 */ private static Double getDistanceMeter(GlobalCoordinates gpsFrom, GlobalCoordinates gpsTo, Ellipsoid ellipsoid) { // GeodeticCurve geodeticCurve = new GeodeticCalculator().calculateGeodeticCurve(ellipsoid, gpsFrom, gpsTo); return geodeticCurve.getEllipsoidalDistance(); } /** * @desc:使用传入的ellipsoidsphere方法计算距离 * @param latitude 位置1经度 * @param longitude 位置1维度 * @param latitude2 位置2经度 * @param longitude2 位置2维度 * @param ellipsoid 椭圆计算算法 * @return */ private static Double ellipsoidMethodDistance(Double latitude, Double longitude, Double latitude2, Double longitude2, Ellipsoid ellipsoid){ // todo 位置点经度、维度不为空 位置点2经度、维度不为空 椭圆算法 Objects.requireNonNull(latitude, "latitude is not null"); Objects.requireNonNull(longitude, "longitude is not null"); Objects.requireNonNull(latitude2, "latitude2 is not null"); Objects.requireNonNull(longitude2, "longitude2 is not null"); Objects.requireNonNull(ellipsoid, "ellipsoid method is not null"); // todo 地球坐标对象:封装经度维度坐标对象 GlobalCoordinates source = new GlobalCoordinates(latitude, longitude); GlobalCoordinates target = new GlobalCoordinates(latitude2, longitude2); // todo 椭圆范围计算方法 return getDistanceMeter(source, target, ellipsoid); } /** * @desc:使用ellipsoidsphere方法计算距离 * @param latitude * @param longitude * @param latitude2 * @param longitude2 * @return distance 单位:m */ public static Double getDistance(Double latitude,Double longitude,Double latitude2,Double longitude2) { // 椭圆范围计算方法:Ellipsoid.Sphere return ellipsoidMethodDistance(latitude, longitude, latitude2, longitude2, Ellipsoid.Sphere); } }
电子围栏中自定义对象将两个数据流合并
- 通过关联两个数据流后CoFlatMap 后生成实体类—— ElectricFenceModel
/** * 电子围栏规则计算模型 */ @Data @AllArgsConstructor @NoArgsConstructor public class ElectricFenceModel implements Comparable<ElectricFenceModel> { //车架号 private String vin = ""; //电子围栏结果表UUID private Long uuid = -999999L; //上次状态 0 里面 1 外面 private int lastStatus = -999999; //当前状态 0 里面 1 外面 private int nowStatus = -999999; //位置时间 yyyy-MM-dd HH:mm:ss private String gpsTime = ""; //位置纬度-- private Double lat = -999999D; //位置经度-- private Double lng = -999999D; //电子围栏ID private int eleId = -999999; //电子围栏名称 private String eleName = ""; //中心点地址 private String address = ""; //中心点纬度 private Double latitude; //中心点经度 private Double longitude = -999999D; //电子围栏半径 private Float radius = -999999F; //出围栏时间 private String outEleTime = null; //进围栏时间 private String inEleTime = null; //是否在mysql结果表中 private Boolean inMysql = false; //状态报警 0:出围栏 1:进围栏 private int statusAlarm = -999999; //报警信息 private String statusAlarmMsg = ""; //终端时间 private String terminalTime = ""; // 扩展字段 终端时间 private Long terminalTimestamp = -999999L; @Override public int compareTo(ElectricFenceModel o) { if(this.getTerminalTimestamp() > o.getTerminalTimestamp()){ return 1; } else if(this.getTerminalTimestamp() < o.getTerminalTimestamp()){ return -1; }else{ return 0; } } }
- 实现将两个流合并CoFlatMapFunction接口—— ElectricFenceRulesFuntion
//1.定义返回的 ElectricFenceModel //2.判断如果流数据数据质量(车辆的经纬度不能为0或-999999,车辆GpsTime不能为空) //2.1.获取当前车辆的 vin //2.2.通过vin获取电子围栏的配置信息 //2.3.如果电子围栏配置信息不为空 //2.3.1.说明当前车辆关联了电子围栏规则,需要判断当前上报的数据是否在电子围栏规则的生效时间内,先获取上报地理位置时间gpsTimestamp //2.3.2.如果当前gpsTimestamp>=开始时间戳并且gpsTimestamp<=结束时间戳,以下内容存入到 ElectricFenceModel //2.3.2.1.上报车辆的数据在电子围栏生效期内 vin gpstime lng lat 终端时间和终端时间戳 //2.3.2.2.电子围栏id,电子围栏名称,地址,半径 //2.3.2.3.电子围栏经纬度 //2.3.2.4.计算经纬度和电子围栏经纬度距离距离,如果两点之间大于半径(单位是千米)的距离,就是存在于圆外,否则反之 //2.3.2.5.收集结果数据
设置窗口并计算确定是否在电子围栏内告警
- 设置水印机制
- 根据 vin 进行分组
- 创建 90 秒翻滚窗口
- 自定义电子围栏窗口实现类:ElectricFenceWindowFunction
//对电子围栏进行自定义窗口操作,处理电子围栏判断逻辑 //继承 RichWindowFunction<ElectricFenceModel, ElectricFenceModel, String, TimeWindow> //1.定义存储历史电子围栏数据的state,<vin,是否在电子围栏内0:内,1:外> MapState<String, Integer> //2.重写open方法 //2.1 定义mapState的描述器(相当于表结构) <String,Integer> //2.2 获取 parameterTool,用来读取配置文件参数 //2.3 读取状态的超时时间 "vehicle.state.last.period" ,构建ttl设置更新类型和状态可见 //2.4 设置状态描述 StateTtlConfig,开启生命周期时间 //2.5 获取map状态
- apply 方法步骤如下
//1.创建返回对象 //2.对窗口内的数据进行排序 //3.从 state 中获取车辆vin对应的上一次窗口电子围栏lastStateValue标记(车辆上一次窗口是否在电子围栏中)0:电子围栏内 1:电子围栏外 //4.如果上次状态为空,初始化赋值 //5.判断当前处于电子围栏内还是电子围栏外 //5.1.定义当前车辆电子围栏内出现的次数 //5.2.定义当前车辆电子围栏外出现的次数 //6.定义当前窗口的电子围栏状态 //7. 90s内车辆出现在电子围栏内的次数多于出现在电子围栏外的次数,则认为当前处于电子围栏内 //8. 将当前窗口的电子围栏状态写入到 state 中,供下次判断 //9.如果当前电子围栏状态与上一次电子围栏状态不同 //9.1.如果上一次窗口处于电子围栏外,而本次是电子围栏内,则将进入电子围栏的时间写入到数据库中 //9.1.1.过滤出来状态为0的第一条数据 //9.1.2.拷贝属性给 electricFenceModel 并将进入终端时间赋值,并且将状态告警字段赋值为1 0:出围栏 1:进围栏,将数据collect返回 //9.2.如果上一次窗口处于电子围栏内,而本次是电子围栏外,则将出电子围栏的时间写入到数据库中 //9.2.1.过滤出来状态倒序为1的第一条数据 //9.2.2.拷贝属性给 electricFenceModel 并将出终端时间赋值,并且将状态告警 0:出围栏 1:进围栏,将数据collect返回
- 如果判断为进入到电子围栏,进入到电子围栏的第一条数据的时间会被记录下来
合并分析电子围栏结果
读取电子围栏分析结果并广播
- 读取mysql的电子围栏结果表的数据——MysqlElectricFenceResultSource
//读取电子围栏分析结果表的数据,并进行广播 //继承自 RichSourceFunction<HashMap<String, Long>> //1.重写 open 方法,初始化连接 //1.1 编写sql "select vin, min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null GROUP BY vin;" //2.重写 close 方法 //3.重写 run 方法 获取出来vin 和 id 封装成map并返回 //4.重写 cancel 方法
- 读取电子栅栏的 vin 和 最近id
select vin,min(id) id from vehicle_networking.electric_fence where inTime is not null and outTime is null group by vin
- 将读取的电子栅栏信息数据流广播出去
窗口流数据与广播流数据连接
- 将电子栅栏模型数据流和电子栅栏 获取的流进行关联,并进行 flatMap
- 实现电子围栏分析结果模型添加 uuid 和 inMysql 字段 —— ElectricFenceModelFunction
//实现 CoFlatMapFunction<ElectricFenceModel, HashMap<String, Long>, ElectricFenceModel> //1.重写flatMap1方法 //1.1.通过getvin获取配置流中是否存在值 //2.如果不为 null //2.1.设置为当前时间戳 //2.2.设置库中InMysql是否存在为 true //3.否则 //3.1.设置 uuid 为最大值-当前时间戳 //3.2 设置库中是否存在为 false //4.收集数据 //5.重写 flatMap2 方法 //5.1.读取配置数据
电子围栏分析结果入库
- 将电子围栏分析结果数据写入到 mysql 数据库中 —— ElectricFenceMysqlSink
//继承于 RichSinkFunction<ElectricFenceModel> //1. 重写 open 方法,获取参数,创建连接 //2. 重写 invoke 方法, //2.1 出围栏(且能获取到进围栏状态的)则修改进围栏的状态, 否则 进入围栏,转换ElectricFenceModel对象,插入结构数据到电子围栏结果表 //3. 重写 close 方法