实时即未来,车联网项目之车辆驾驶行为分析【五】

简介: 单次行驶里程区间分布、单次行程消耗soc区间分布、最大里程分布、充电行程占比、平均行驶里程分布、周行驶里程分布、最大行驶里程分段统计、常用行驶里程、全国-每日平均行驶里程(近4周)、全国-单车日均行驶里程分布(近一年)、各车系单次最大行驶里程分布、不同里程范围内车辆占比情况。

驾驶行程分析定义和业务逻辑


驾驶行程的概念


一个完整意义的驾驶的行程,定义 15 分钟作为一个完整的行程,15分钟是停车前最后一条数据和驾驶前第一条数据之间间隔15分钟


818fe57c8a3d07e77da7b709b129ff54.png


驾驶行程分析业务的价值,分析什么指标


单次行驶里程区间分布、单次行程消耗soc区间分布、最大里程分布、充电行程占比、平均行驶里程分布、周行驶里程分布、最大行驶里程分段统计、常用行驶里程、全国-每日平均行驶里程(近4周)、全国-单车日均行驶里程分布(近一年)、各车系单次最大行驶里程分布、不同里程范围内车辆占比情况。


驾驶行程分析流程


1.创建流执行环境
2.获取kafka中的数据
3.将json字符串解析成车辆数据对象
4.过滤出正确的数据并且是行程数据 chargeStatus=2或者chargeStatus=3
0x01:停车充电。0x02:行驶充电。0x03:未充电状态。0x04:充电完成。0xFE:异常。0xFF:无效。
5.分配水印机制,设置最大延迟时间 30s
6.超过3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟
7.对车辆数据进行分组,创建会话窗口
8.数据的采样分析
8.1.应用窗口,数据的采样分析
8.2.将分析的采样数据封装成数组,并将其保存到HBase中
9.数据的行程分析
9.1.应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象
9.2.将这个对象保存到hbase中
10.执行流环境任务


驾驶行程分析任务设置


核心主类创建


  • 任务名称—— TripDriveTask


  • 继承 BaseTask


  • 获取流执行环境


  • 读取 Kafka 数据源


  • 数据的实时 ETL 操作,解析、转换,过滤出来正常的数据,过滤出来驾驶行程数据


chargeStatus=2 or chargeStatus=3


  • 创建行程划分window


。分配水印机制


1182e9f36dae12123eb7824d03bb92e7.png


  • 根据 vin 进行分组
  • 指定window为 SessionWindow
  • 驾驶行程采样分析入库
  • 驾驶行程划分与入库


实现步骤:
1:初始化flink流处理的运行环境(checkpoint设置、事件时间处理数据、并行度等等)
2:接入kafka数据源将数据读取到返回(指定topic、集群地址、消费者id等等)
3:将消费到的json字符串转换成itcastDataObj
4:过滤出来驾驶行程相关的数据,行车充电(chargeStatus=2)、未充电(=3)
5:对驾驶行程数据应用水印(允许数据延迟30s)
6:对加了水印的数据进行分组操作,应用窗口操作(session窗口)
 6.1:车辆在驾驶行程中如果超过15m没有上报数据,可以认为是上一个行程的结束
7:驾驶行程采样数据的业务开发
 7.1:对数据应用自定义窗口的业务逻辑处理(划分四类数据:5s、10s、15s、20s)
 7.2:将自定义驾驶行程采样业务处理后的数据写入到hbase中
8:驾驶行程数据的业务开发
 8.1:驾驶行程数据应用自定义窗口的业务逻辑处理
 8.2:将自定义驾驶行程业务处理后的数据写入到hbase中
9:递交作业


知识点回顾——水印机制


  • 水印机制解决数据延迟和乱序问题


知识点回顾——窗口分配


  • BoundedOutofTimeExtractor ,抽取出来最大容忍延迟时间,指定事件时间


驾驶行程采样入库


  • 采样逻辑
  • 行程划分水印
  • 根据 vin 进行分组
  • 创建会话窗口
  • 处理行程划分,指定window function


。引入 guava 库中的Lists类,创建一个可变的包含给定元素的ArrayList示例


<!-- google guava开发工具依赖-->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>${guava.version}</version>
</dependency>


。引入 window function,处理行程划分采样逻辑—— DriveSampleWindowFunction


//创建驾驶行程采集数据自定义函数开发类 窗口内数据按照5m,10m,20m维度进行数据的收集和分析,此类继承于RichWindowFunction 抽象类
//1.重写 apply 方法
//1.1 将迭代器转换成集合列表
//1.2 对集合列表的数据进行排序操作
//1.3 首先要获取排序后的第一条数据从而得到周期(5m等)的开始时间
//1.4 采样数据的soc(剩余电量百分比)、mileage(累计里程)、speed(车速)、gps(经度+维度)、terminalTime(终端时间)字段属性需要拼接到一个字符串返回
//soc(剩余电量百分比)
//mileage(累计里程)
//speed(车速)
//gps:地理位置
//terminalTime:终端时间
//1.5 获取排序后的最后一条数据作为当前窗口的最后一条数据
//1.6 获取窗口的第一条数据的终端时间作为开始时间戳
//1.7 获取窗口的最后一条数据的终端时间作为结束时间戳
//1.8 遍历窗口内的每条数据,计算5m采样周期内的数据
//1.9 创建字符串数组类型用于存储采集到的车辆唯一编码,终端时间戳,剩余电量,总里程数,车速,地理gps,终端时间
//1.10 返回数据


  • 驾驶行程采样入库 hbase

。创建 HBase 的表空间 : TRIPDB


create_namespace 'TRIPDB'
list_namespace


。创建入采样入库的表 : trip_sample,压缩方式是:snappy


create "TRIPDB:trip_sample",{ NAME => 'cf', COMPRESSION => 'SNAPPY' }


。自定义sink类用于保存采样数据—— TripSampleToHBaseSink


//将数据保存到 TRIPDB:trip_sample 表
//数组对象生成Put对象
//通过车辆唯一编码+终端时间作为rowkey
//将soc,mileage,speed,gps,terminalTime,processTime=当前日期时间字符串 封装成put


驾驶行程入库


  • 使用之前已经生成的按 vin 分组并分配水印的车辆数据
  • 引入 TripModel 对象,用于接收返回的数据


import cn.itcast.utils.DateUtil;
import lombok.Data;
/**
 * 定义驾驶行程计算结果对应的JavaBean对象
 */
@Data
public class TripModel {
    //车架号
    private String vin = "";
    //上次报文soc
    private Double lastSoc =  -999999D;
    //上次报文里程数
    private Double lastMileage = -999999D;
    //行程开始时间
    private String tripStartTime = "";
    //行程开始soc
    private int start_BMS_SOC =  -999999;
    //行程开始经度
    private Double start_longitude = -999999D;
    //行程开始纬度
    private Double start_latitude =  -999999D;
    //行程开始里程
    private Double start_mileage = -999999D;
    //结束soc
    private int end_BMS_SOC =  -999999;
    //结束经度
    private Double end_longitude =  -999999D;
    //结束纬度
    private Double end_latitude =  -999999D;
    //结束里程
    private Double end_mileage =  -999999D;
    //行程结束时间
    private String tripEndTime = "" ;
    //行程里程消耗
    private Double mileage =  -999999D;
    //最高行驶车速
    private Double max_speed =  0D;
    //soc消耗
    private Double soc_comsuption =  0D;
    //行程消耗时间(分钟)
    private Double time_comsuption =  -999999D;
    //总低速的个数
    private Long total_low_speed_nums = 0L;
    //总中速的个数
    private Long total_medium_speed_nums =  0L;
    //总高速个数
    private Long total_high_speed_nums =  0L;
    //低速soc消耗
    private Double Low_BMS_SOC =  0D;
    //中速soc消耗
    private Double Medium_BMS_SOC =  0D;
    //高速soc消耗
    private Double High_BMS_SOC =  0D;
    //低速里程
    private Double Low_BMS_Mileage =  0D;
    //中速里程
    private Double Medium_BMS_Mileage =  0D;
    //高速里程
    private Double High_BMS_Mileage =  0D;
    //是否为异常行程 0:正常行程 1:异常行程(只有一个采样点)
    private int tripStatus = -999999;
    /**
     * 将驾驶行程计算结果数据保存到hdfs时候需要转换成可以被hive所识别的字符串格式
     * @return
     */
    public String toHiveString() {
        StringBuilder resultString = new StringBuilder();
        if (this.vin != "") resultString.append(this.vin).append("\t"); else resultString.append("NULL").append("\t");
        if (this.tripStartTime != "") resultString.append(this.tripStartTime).append("\t"); else resultString.append("NULL").append("\t");
        if (this.tripEndTime != "") resultString.append(this.tripEndTime).append("\t"); else resultString.append("NULL").append("\t");
        if (this.lastSoc !=  -999999 ) resultString.append(this.lastSoc).append("\t"); else resultString.append("NULL").append("\t");
        if (this.lastMileage !=  -999999 ) resultString.append(this.lastMileage).append("\t"); else resultString.append("NULL").append("\t");
        if (this.start_BMS_SOC !=  -999999 ) resultString.append(this.start_BMS_SOC).append("\t"); else resultString.append("NULL").append("\t");
        if (this.start_longitude !=  -999999 ) resultString.append(this.start_longitude).append("\t"); else resultString.append("NULL").append("\t");
        if (this.start_latitude !=  -999999 ) resultString.append(this.start_latitude).append("\t"); else resultString.append("NULL").append("\t");
        if (this.start_mileage !=  -999999 ) resultString.append(this.start_mileage).append("\t"); else resultString.append("NULL").append("\t");
        if (this.end_BMS_SOC !=  -999999 ) resultString.append(this.end_BMS_SOC).append("\t"); else resultString.append("NULL").append("\t");
        if (this.end_longitude !=  -999999 ) resultString.append(this.end_longitude).append("\t"); else resultString.append("NULL").append("\t");
        if (this.end_latitude !=  -999999 ) resultString.append(this.end_latitude).append("\t"); else resultString.append("NULL").append("\t");
        if (this.end_mileage !=  -999999 ) resultString.append(this.end_mileage).append("\t"); else resultString.append("NULL").append("\t");
        if (this.mileage !=  -999999 ) resultString.append(this.mileage).append("\t"); else resultString.append("NULL").append("\t");
        if (this.max_speed !=  -999999 ) resultString.append(this.max_speed).append("\t"); else resultString.append("NULL").append("\t");
        if (this.soc_comsuption !=  -999999 ) resultString.append(this.soc_comsuption).append("\t"); else resultString.append("NULL").append("\t");
        if (this.time_comsuption !=  -999999 ) resultString.append(this.time_comsuption).append("\t"); else resultString.append("NULL").append("\t");
        if (this.total_low_speed_nums !=  -999999 ) resultString.append(this.total_low_speed_nums).append("\t"); else resultString.append("NULL").append("\t");
        if (this.total_medium_speed_nums !=  -999999 ) resultString.append(this.total_medium_speed_nums).append("\t"); else resultString.append("NULL").append("\t");
        if (this.total_high_speed_nums !=  -999999 ) resultString.append(this.total_high_speed_nums).append("\t"); else resultString.append("NULL").append("\t");
        if (this.Low_BMS_SOC !=  -999999 ) resultString.append(this.Low_BMS_SOC).append("\t"); else resultString.append("NULL").append("\t");
        if (this.Medium_BMS_SOC !=  -999999 ) resultString.append(this.Medium_BMS_SOC).append("\t"); else resultString.append("NULL").append("\t");
        if (this.High_BMS_SOC !=  -999999 ) resultString.append(this.High_BMS_SOC).append("\t"); else resultString.append("NULL").append("\t");
        if (this.Low_BMS_Mileage !=  -999999 ) resultString.append(this.Low_BMS_Mileage).append("\t"); else resultString.append("NULL").append("\t");
        if (this.Medium_BMS_Mileage !=  -999999 ) resultString.append(this.Medium_BMS_Mileage).append("\t"); else resultString.append("NULL").append("\t");
        if (this.High_BMS_Mileage !=  -999999 ) resultString.append(this.High_BMS_Mileage).append("\t"); else resultString.append("NULL").append("\t");
        if (this.tripStatus !=  -999999 ) resultString.append(this.tripStatus).append("\t"); else resultString.append("NULL").append("\t");
        resultString.append(DateUtil.getCurrentDateTime());
        return resultString.toString();
    }
}


  • 引入 window function,驾驶行程数据的实时计算逻辑—— DriveTripWindowFunction


//创建驾驶行程窗口内数据的自定义实时计算逻辑,此类继承于RichWindowFunction 抽象类
//1.重写 apply 方法
//1.1 将迭代器转换成集合列表
//1.2 对集合列表的数据进行排序操作
//1.3 将集合对象转换成 TripModel 对象返回
//1.4 将 TripModel 对象收集返回
//2.将驾驶行程指标计算getTripModel,得到 TripModel 对象
//2.1 获取第一条数据
//2.2 将 vin(车架号) ,tripStartTime(行程开始时间),start_BMS_SOC(行程开始Soc),start_longitude(行程开始经度),start_latitude(行程开始维度),start_mileage(行程开始表显里程数) 传递给 tripModel 对象。
//2.3 从最后一条数据中得到对象并赋值给 tripEndTime(行程结束时间),end_BMS_SOC(行程结束soc),end_longitude(行程结束经度),end_latitude(行程结束维度),end_mileage(行程结束表显里程数),mileage(行程驾驶公里数),time_comsuption(行程消耗时间)、这里存储的是分钟数,lastSoc(上次的行程Soc)、将当前行程开始的电量消耗百分比作为上一个行程结束的电量消耗百分比,lastMileage(上次的里程数)
//2.4 遍历 itcastDataObj list获取一下内容
// 每条数据的速度
    //获取上次行程报文的soc
    //计算每条数据的soc与lastSoc进行比较差值 socDiff
    //如果socDiff大于0,那么赋值给 soc_comsuption
    //如果speed大于tripModel对象中保存的最大车速并且小于150,将speed保存为最大速度
    //低速行驶 speed >=0 && <40,低速行驶个数+1;low_BMS_SOC=low_BMS_SOC+(当次油耗和上次油耗之差);setLow_BMS_Mileage=setLow_BMS_Mileage+(当次里程和上次里程之差)
    //中速行驶 >=40 && <80
    //高速行驶 >80 && <150
    //设置当前的soc作为 lastSoc 上次的soc 和将当次的里程作为 lastMileage 上次的里程
    //增加扩展字段,判断当前是否有异常数据,如果列表长度大于1说明是正常行程0,否则是异常行程1


  • 驾驶行程数据写入到 HBase —— TripDriveToHBaseSink

。创建驾驶行程的表 : trip_division ,压缩方式是:snappy


create "TRIPDB:trip_division",{ NAME => 'cf', COMPRESSION => 'SNAPPY' }


。自定义sink类用于保存采样数据——TripDivisionHBaseSink


//将数据保存到表 TRIPDB:trip_division
//数组对象生成Put对象
//通过车辆唯一编码+行程开始时间作为rowkey
//将车辆行程的字段分别写入进来 ...


驾驶行程指标分析


  • 在 phoenix 创建行程采样视图


CREATE VIEW TRIPDB."trip_sample" ("rowNum" varchar PRIMARY KEY, "cf"."soc" varchar, "cf"."mileage" varchar, "cf"."speed" varchar, "cf"."gps" varchar, "cf"."terminalTime" varchar, "cf"."processTime" varchar);


  • 在 MySQL中创建表 - t_sample_result 用于前端展示


create table vehicle_networking.t_sample_result(   
    id      int auto_increment comment '主键' primary key,   
    name     varchar(25) not null comment '采样样本指标名称',   
    totalNum   int     not null comment '样本总数',   
    processTime varchar(25) not null comment '样本总数计算时间' 
) comment '采样样本指标统计结果表';


  • 添加依赖,用于编写 phoenix jdbc 读写数据工具类


<dependency>   
    <groupId>org.apache.phoenix</groupId>   
    <artifactId>phoenix-core</artifactId>   
    <version>${phoenix.version}</version>   
    <exclusions>     
        <exclusion>       
            <groupId>org.glassfish</groupId>       
            <artifactId>javax.el</artifactId>     
            </exclusion>   
        </exclusions> 
</dependency>


  • 导入 PhoenixJDBCUtil.java 工具类


  • 导入 JDBCUtil.java 工具类


  • 导入车辆采样分析的 分析类 TripSamplePhoenixAnalysis .java


  • 指标查询


。里程、soc、行程消耗时间分析

。高速、中速、低速soc消耗分析

。高速、中速、低速里程分析

。高速、中速、低速车次分析

。驾驶行程剩余指标业务

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
5月前
|
存储 数据可视化 数据挖掘
实时数据分析系统的构建与优化
【7月更文挑战第29天】实时数据分析系统的构建与优化是一个复杂而细致的过程,需要从需求分析、数据源确定、数据采集与传输、数据处理与分析、数据存储、数据可视化、系统部署与配置、监控与优化等多个方面进行综合考虑。通过选择合适的技术栈和优化策略,可以构建出高效、稳定的实时数据分析系统,为企业决策提供强有力的支持。
|
7月前
|
存储 消息中间件 SQL
分钟级实时数据分析的背后——实时湖仓产品解决方案
袋鼠云在结合当前数据湖技术的基础上,建设实时湖仓平台,满足客户“快、精、准”的数据需求。本文将详细介绍实时湖仓产品解决方案,让企业能够更专注地去解决他们的业务价值。
160 0
|
7月前
|
存储 数据采集 监控
智慧工地整体方案,实现现场各类工况数据采集、存储、分析与应用
“智慧工地整体方案”以智慧工地物联网云平台为核心,基于智慧工地物联网云平台与现场多个子系统的互联,实现现场各类工况数据采集、存储、分析与应用。通过接入智慧工地物联网云平台的多个子系统板块,根据现场管理实际需求灵活组合,实现一体化、模块化、智能化、网络化的施工现场过程全面感知、协同工作、智能分析、风险预控、知识共享、互联互通等业务,全面满足建筑施工企业精细化管理的业务需求,智能化地辅助建筑施工企业进行科学决策,促进施工企业监管水平的全面提高。
379 0
|
SQL 分布式计算 调度
开源大数据分析实验(1)——简单用户画像分析之采集数据
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
|
数据采集 传感器 物联网
数据采集技术基础
数据采集技术基础
566 1
数据采集技术基础
|
机器学习/深度学习 人工智能 分布式计算
离线实时一体化新能力解读| 学习笔记
快速学习离线实时一体化新能力解读
870 0
离线实时一体化新能力解读| 学习笔记
EMQ
|
数据采集 存储 人工智能
高效数据通道支撑生产情况实时分析与可视化
EMQ生产数据可视化解决方案海量保障生产数据传输和持久化的实时性、可靠性、安全性,为大数据分析、人工智能应用提供良好数据基础。
EMQ
189 0
高效数据通道支撑生产情况实时分析与可视化
|
消息中间件 SQL 运维
如何设计实时数据平台(技术篇)
本文从技术角度入手,介绍RTDP的技术选型和相关组件,探讨适用不同应用场景的相关模式。
|
存储 NoSQL 关系型数据库
实时即未来,车联网项目之远程诊断实时故障分析【七】
geohash 就是将地图上位置(经纬度)转换成偶数位是经度、奇数数是维度,新的二进制字节,转换成字符串,用字符串代表某一个地理位置。
583 0
|
SQL 数据采集 运维
《实时数仓助力互联网实时决策和精准营销》|学习笔记
快速学习《实时数仓助力互联网实时决策和精准营销》
255 0