实时即未来,车联网项目之将数据落地到文件系统和数据库【三】

简介: 批量写入需要使用的缓存对象 - BufferedMutator 写数据的原理将数据按批次写入到 BufferedMutator 对象中,按时间或者按大小写入。

实时读取流数据的步骤


080494e84d5ed193ec03ddcd8837decd.png


原始数据实时ETL任务分析 Hive


将HDFS数据映射到Hive表


  • 需要指定的HDFS的目录


c0595bf95c7530debbbe0a8f735a56ba.png


  • 回忆如何映射HDFS数据到Hive表中


① 创建表 create external table maynor_src (…) row formate delimited field terminate by ‘\t’ partitioned by(dt string) location ‘hdfs://node01…/maynor_src’;


② 使用数据库


③ 添加文件夹到指定分区


alter table maynor_src add partition(dt=‘20210922’) location ‘hdfs://node01:8020/apps/warehouse/ods.db/maynor_src/20210922’


#!/bin/bash
dt=`date -d '1 days ago' +'%Y%m%d'`
tableName=$1
ssh node03 `/export/server/hive/bin/hive -e "use maynor_ods;alter table ${tableName} add partition(dt=${dt}) location 'hdfs://node01:8020/apps/warehouse/ods.db/${tableName}/${dt}"`


  • 如何实现从HDFS中正确或错误的数据映射到Hive表中





  • 如何自动化HDFS数据到Hive表中


# 使用shell 脚本
alter table maynor_src add partition (dt="20210922") location "/apps/hive/warehouse/ods.db/maynor_src/20210922";


  • 如何执行 t+1 离线任务,设置调度的两种方式


① crontab


linux 自带调度


② 调度平台


azkaban airflow dolphinscheduler oozie 自研


自定义Sink数据写入Hive表(了解)


  • 实现步骤


package cn.maynor.streaming.sink;
import cn.maynor.streaming.entity.maynorDataObj;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
/**
 * Author maynor
 * Date 2021/9/22 10:02
 * Desc 将每条车辆的数据直接写入到 Hive 中
 */
public class SaveErrorDataHiveSink extends RichSinkFunction<maynorDataObj> {
    //定义 logger
    private static final Logger logger = LoggerFactory.getLogger(SaveErrorDataHiveSink.class);
    //2.创建有参构造方法,参数包括数据库名和表名
    //定义变量
    private String dbName;
    private String tableName;
    //定义连接对象和statement对象
    private Connection conn = null;
    private Statement statement = null;
    //构造方法
    public SaveErrorDataHiveSink(String _dbName,String _tableName){
        this.dbName = _dbName;
        this.tableName = _tableName;
    }
    //3.重写open方法进行Hive连接的初始化
    @Override
    public void open(Configuration parameters) throws Exception {
        //3.1 将JDBC驱动 org.apache.hive.jdbc.HiveDriver 加载进来
        //获取全局参数
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters();
        //获取当前上下文中 hive 的驱动
        Class.forName(parameterTool.getRequired("hive.driver"));
        //3.2 设置JDBC连接Hive的连接器,端口为10000
        conn = DriverManager.getConnection(
                parameterTool.getRequired("hive.url"),
                parameterTool.getRequired("hive.user"),
                parameterTool.get("hive.password")
        );
        //3.3 创建Statement
        statement = conn.createStatement();
        //3.4 定义 schemaAndTableExists 实现库不存在创建库,表不存在创建表
        Boolean flag = schemaAndTableExists(dbName,tableName,statement);
        if(flag){
            logger.info("当前数据库和表初始化成功!");
        }else{
            logger.warn("请检查数据库和表!");
        }
    }
    //5.重写cloese方法 关闭连接
    @Override
    public void close() throws Exception {
        if(!statement.isClosed())statement.close();
        if(!conn.isClosed())conn.close();
    }
    //4.重写invoke将每条数据
    @Override
    public void invoke(maynorDataObj value, Context context) throws Exception {
        //4.1 编写SQL将数据插入到表中
        // insert into maynor_error values('11111');
        StringBuffer buffer = new StringBuffer();
        buffer.append("INSERT INTO "+tableName);
        buffer.append(" VALUES('");
        buffer.append(value.getErrorData()+"'");
        //4.2 执行statement.executeUpdate 将数据直接落地到Hive表
        statement.executeUpdate(buffer.toString());
    }
    //6.定义 schemaAndTableExists 方法 create database if not exists库或表, execute,选择数据库
    /**
     * 初始化数据库和数据表,如果初始化成功返回 true,否则 false
     * @param dbName
     * @param tableName
     * @param statement
     * @return
     */
    private Boolean schemaAndTableExists(String dbName, String tableName, Statement statement) {
        //数据库是否存在
        Boolean flag = true;
        try{
            //初始化数据库
            String createDBSQL="create database if not exists "+dbName;
            boolean executeDB = statement.execute(createDBSQL);
            if(executeDB){
                logger.info("当前数据库创建成功");
                flag = true;
            }else{
                logger.info("当前数据库已经存在");
                flag = true;
            }
            //初始化数据表
            String createTableSQL = "use "+tableName+";create table if not exists "+tableName+" (json string) partition by dt" +
                    " row formatted delimited field terminate by '\t' location '/apps/hive/warehouse/ods.db/maynor_error'";
            boolean executeTable = statement.execute(createTableSQL);
            if(executeTable){
                logger.info("当前数据库表创建成功");
                flag = true;
            }else{
                logger.info("当前数据表已经存在");
                flag = true;
            }
        }catch (Exception ex){
            logger.warn("初始化失败!");
            flag = false;
        }
        return flag;
    }
}


原始数据实时ETL落地到HBase


  • 写入hbase的步骤和准备


1.写入的表名

2.hbase的rowkey

3.写入的列簇 columnFamily

4.列名和列值


HBase的rowkey设计原则


① rowkey 的长度原则 , 16个字节


② rowkey 的散列原则 ,尽量保证离散


③ rowkey 的唯一原则 , rowkey不要一样


HBase的rowkey设计方法


① 加盐 —— 随机数


② Hash散列


③ 翻转字符串


正常数据落地到HBase


  • 开启 HBase 集群


# 首先开启 hdfs ,zookeeper 
/export/server/hbase/start-hbase.sh


  • 进入到 HBase命令行


hbase shell


  • 创建HBase表 - maynor_src ,列簇为 cf


# 查看hbase所有表
list
# 查看namespace(数据库)
list_namespace
# 创建数据表
hbase(main):005:0> create 'maynor_src','cf'
# 查看表中的数据
scan 'maynor_src'


  • 开发步骤


//1.创建 SrcDataToHBaseSink类继承 RichSinkFunction<maynorDataObj>
//2.创建一个有参数-表名的构造方法
//3.重写open方法
//3.1 从上下文获取到全局的参数
//3.2 设置hbase的配置,Zookeeper Quorum集群和端口和TableInputFormat的输入表
//3.3 通过连接工厂创建连接
//3.4 通过连接获取表对象
//4.重写close方法
//4.1 关闭hbase 表和连接资源
//5. 重写 invoke 方法,将读取的数据写入到 hbase
//5.1 setDataSourcePut输入参数value,返回put对象
//6. 实现 setDataSourcePut 方法
//6.1 如何设计rowkey VIN+时间戳翻转
//6.2 定义列簇的名称
//6.3 通过 rowkey 实例化 put
//6.4 将所有的字段添加到put的字段中


原始数据实时 ETL 任务 HBase 调优


数据写入HBase优化 - 客户端优化


  • 为什么需要优化呢?


防止出现每条数据都读写 HBase 数据库,造成集群宕机和数据丢失。


  • 批量写入需要使用的缓存对象 - BufferedMutator 写数据的原理


将数据按批次写入到 BufferedMutator 对象中,按时间或者按大小写入。


  • 代码逻辑优化


BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
        params.writeBufferSize(10 * 1024 * 1024L);
        params.setWriteBufferPeriodicFlushTimeoutMs(5 * 1000L);
//5.1 setDataSourcePut输入参数value,返回put对象
        try {
            Put put = setDataSourcePut(value);
            mutator.mutate(put);
            //5.2 指定时间内的数据强制刷写到hbase
            mutator.flush();
        }catch (Exception ex){
            logger.error("写入到hbase失败:"+ex.getMessage());
        }


  • 在主流程中将数据写入到 maynor_src


数据写入HBase预分区


  • 预分区的概念


  • 创建预分区的语法


数据写入HBase预写日志


  • 预写日志的作用


  • memstore在HBase读写作用


数据写入HBase使用压缩和编码


  • 编码压缩其实是对列数据的压缩


  • 编码压缩的优势





  • 编码类型


  • 创建一个 fast_diff 编码的 maynor_src 表


alter 'maynor_src', { NAME => 'cf', DATA_BLOCKs_ENCODING => 'FAST_DIFF' }


  • 压缩算法


  • 创建一个 gz 或 snappy 压缩的 maynor_src_gz 表


create 'maynor_src',{NAME => 'cf',COMPRESSION => 'gz'}create 'maynor_src_snappy', { NAME => 'cf', COMPRESSION => 'SNAPPY' }


  • 查看数据量大小
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
18天前
|
存储 人工智能 Cloud Native
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
在9月20日2024云栖大会上,阿里云智能集团副总裁,数据库产品事业部负责人,ACM、CCF、IEEE会士(Fellow)李飞飞发表《从数据到智能:Data+AI驱动的云原生数据库》主题演讲。他表示,数据是生成式AI的核心资产,大模型时代的数据管理系统需具备多模处理和实时分析能力。阿里云瑶池将数据+AI全面融合,构建一站式多模数据管理平台,以数据驱动决策与创新,为用户提供像“搭积木”一样易用、好用、高可用的使用体验。
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
|
20天前
|
SQL 关系型数据库 数据库
国产数据实战之docker部署MyWebSQL数据库管理工具
【10月更文挑战第23天】国产数据实战之docker部署MyWebSQL数据库管理工具
60 4
国产数据实战之docker部署MyWebSQL数据库管理工具
|
17天前
|
关系型数据库 分布式数据库 数据库
云栖大会|从数据到决策:AI时代数据库如何实现高效数据管理?
在2024云栖大会「海量数据的高效存储与管理」专场,阿里云瑶池讲师团携手AMD、FunPlus、太美医疗科技、中石化、平安科技以及小赢科技、迅雷集团的资深技术专家深入分享了阿里云在OLTP方向的最新技术进展和行业最佳实践。
|
25天前
|
人工智能 Cloud Native 容灾
云数据库“再进化”,OB Cloud如何打造云时代的数据底座?
云数据库“再进化”,OB Cloud如何打造云时代的数据底座?
|
28天前
|
SQL JavaScript 关系型数据库
node博客小项目:接口开发、连接mysql数据库
【10月更文挑战第14天】node博客小项目:接口开发、连接mysql数据库
|
1月前
|
SQL 存储 关系型数据库
数据储存数据库管理系统(DBMS)
【10月更文挑战第11天】
88 3
|
26天前
|
NoSQL 前端开发 MongoDB
前端的全栈之路Meteor篇(三):运行在浏览器端的NoSQL数据库副本-MiniMongo介绍及其前后端数据实时同步示例
MiniMongo 是 Meteor 框架中的客户端数据库组件,模拟了 MongoDB 的核心功能,允许前端开发者使用类似 MongoDB 的 API 进行数据操作。通过 Meteor 的数据同步机制,MiniMongo 与服务器端的 MongoDB 实现实时数据同步,确保数据一致性,支持发布/订阅模型和响应式数据源,适用于实时聊天、项目管理和协作工具等应用场景。
|
9天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
24 1
|
11天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
26 4
|
18天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
85 1