实时即未来,车联网项目之phoenix on hbase 即席查询【四】

本文涉及的产品
阿里云百炼推荐规格 ADB PostgreSQL,4核16GB 100GB 1个月
简介: 实时即未来,车联网项目之phoenix on hbase 即席查询【四】

Phoenix的介绍和客户端命令


  • Phoenix的概念


  • Phoenix on HBase 和 Hive on HDFS 的区别


  • Phoenix 客户端命令操作及说明


  • 查看表操作


① TABLE_SCHEMA



         


② 对应 HBase 的 namespace



         


  • 创建表的操作



         


  • 新增数据



         


  • 查询操作



         


  • 更新操作



         


  • 删除操作



         


构建HBase的二级索引


  • 为什么需要构建二级索引


  • 索引的类型



概念:


创建:



         


查看:



         


删除:



         



概念:


创建:



         


查看:



         


删除:



         



概念:


创建:



         


查看:



         


删除:



         



概念:


创建:



         


查看:



         


删除:



         


  • 本地索引和全局索引的比较


  • 索引的优化


原始数据itcast_src构建二级索引


  • 创建 HBase 表对应的 phoenix 表


  • 创建 HBase 表对应的 phoenix 视图


phoenix 使用场景





车辆常用字段明细数据 ETL


  • 常用字段


  • 创建 HBase 车辆指标即席查询表



         


  • 在 ETL 主任务中添加常用字段落地逻辑


。srcDataStream.addSink(new VehicleDetailSinkOptimizer(“itcastsrc_vehicle_detail”));


。编写核心业务逻辑 实现 RichSinkFunction


private Put setDataSourcePut(ItcastDataObj itcastDataObj) {
        //确定rowkey
        String rowKey = itcastDataObj.getVin() + StringUtil.reverse(itcastDataObj.getTerminalTimeStamp().toString());
        Put put = new Put(Bytes.toBytes(rowKey));
        //设置需要写入的列有那些
        //这两个列一定不为空,如果为空就不是正常数据了
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("vin"), Bytes.toBytes(itcastDataObj.getVin()));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("terminalTime"), Bytes.toBytes(itcastDataObj.getTerminalTime()));
        //电量百分比(currentElectricity)、当前电量(remainPower)、百公里油耗(fuelConsumption100km)、
        // 发动机速度(engineSpeed)、车辆速度(vehicleSpeed)
        if(itcastDataObj.getCurrentElectricity() != -999999D){
            put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("currentElectricity"), Bytes.toBytes(itcastDataObj.getCurrentElectricity()));
        }
        if(itcastDataObj.getRemainPower() != -999999D){
            put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("remainPower"), Bytes.toBytes(itcastDataObj.getRemainPower()));
        }
        if(StringUtils.isNotEmpty(itcastDataObj.getFuelConsumption100km()) ){
            put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("fuelConsumption100km"), Bytes.toBytes(itcastDataObj.getFuelConsumption100km()));
        }
        if(StringUtils.isNotEmpty(itcastDataObj.getEngineSpeed()) ){
            put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("engineSpeed"), Bytes.toBytes(itcastDataObj.getEngineSpeed()));
        }
        if(itcastDataObj.getVehicleSpeed() != -999999D){
            put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("vehicleSpeed"), Bytes.toBytes(itcastDataObj.getVehicleSpeed()));
        }
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("processTime"), Bytes.toBytes(DateUtil.getCurrentDateTime()));
        //返回put对象
        return  put;
    }


  • 在 phoenix 中创建车联指标即席查询明细视图



         


车辆明细数据统计


车辆总数统计


  • 统计车辆明细总数



         


  • 统计每天在线的车辆总数



         


车辆电量统计


  • 统计车辆电量百分比



         


  • 统计当前电量



         


车辆油耗统计


  • 统计车辆百公里油耗



         


车辆速度统计


  • 统计车辆发动机速度



         


  • 统计车辆速度vehicleSpeed,求最大、最小、平均车速



         


车辆数据统计意义分析


Zeppelin简介


  • zeppelin应用场景
  • zeppelin安装介绍


zeppelin UI介绍


zeppelin整合mysql


zeppelin整合hive


车辆电量、速度数据统计报表


  • 车辆电量统计报表
  • 车辆速度统计报表

问题


  • 如何删除topic并初始化数据


步骤:
1.检查 server.properties 配置文件中 delete.topic.enable=true,三台都要设置重启集群。
2.删除kafka中topic vehicledata
[root@node01 kafka]# bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --delete --topic vehicledata
# 使用 kafka-tools 客户端删除topic
3.打开 zkCli.sh 删除三组配置
  rmr /brokers/topics/vehicledata
  rmr /config/topics/vehicledata
  rmr /admin/delete_topics/vehicledata
4.如果kafka集群没有关闭,关闭kafka集群
5.清空log.dirs=/export/data/kafka/kafka-logs目录就是kafka集群的数据目录
rm -rf /export/data/kafka/kafka-logs/*
6.重启kafka集群
7.创建topic vehicledata
[root@node01 kafka]# bin/kafka-topics.sh --zookeeper node01:2181 --create --topic vehicledata --partitions 3 --replication-factor 2
相关实践学习
阿里云百炼xAnalyticDB PostgreSQL构建AIGC应用
通过该实验体验在阿里云百炼中构建企业专属知识库构建及应用全流程。同时体验使用ADB-PG向量检索引擎提供专属安全存储,保障企业数据隐私安全。
AnalyticDB PostgreSQL 企业智能数据中台:一站式管理数据服务资产
企业在数据仓库之上可构建丰富的数据服务用以支持数据应用及业务场景;ADB PG推出全新企业智能数据平台,用以帮助用户一站式的管理企业数据服务资产,包括创建, 管理,探索, 监控等; 助力企业在现有平台之上快速构建起数据服务资产体系
目录
相关文章
|
存储 分布式数据库 数据库
Django项目中使用Hbase的方法
Django项目中使用Hbase的方法
137 0
|
SQL 存储 分布式数据库
【通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据 】
【通过Hive清洗、处理和计算原始数据,Hive清洗处理后的结果,将存入Hbase,海量数据随机查询场景从HBase查询数据 】
247 0
|
存储 SQL 自然语言处理
基于 HBase 的海量数据查询与检索解析|学习笔记
快速学习基于 HBase 的海量数据查询与检索解析
基于 HBase 的海量数据查询与检索解析|学习笔记
|
1月前
|
存储 分布式计算 Hadoop
Hadoop-33 HBase 初识简介 项目简介 整体架构 HMaster HRegionServer Region
Hadoop-33 HBase 初识简介 项目简介 整体架构 HMaster HRegionServer Region
50 2
|
11月前
|
缓存 分布式数据库 API
hbase查询速度很慢
hbase查询速度很慢
581 1
|
6月前
|
SQL 分布式数据库 HIVE
Hbase二级索引_Hive on Hbase 及phoenix详解
Hbase二级索引_Hive on Hbase 及phoenix详解
77 0
|
分布式计算 分布式数据库 Scala
Spark查询Hbase小案例
写作目的 1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看 2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下 3)好久没发博客了,水一篇
211 0
Spark查询Hbase小案例
|
SQL 缓存 分布式计算
HBase查询一张表的数据条数的方法
HBase查询一张表的数据条数的方法
1017 0
HBase查询一张表的数据条数的方法
|
分布式数据库 Hbase
|
分布式数据库 Hbase