离线数仓(六)【ODS 层开发】

本文涉及的产品
云解析 DNS,旗舰版 1个月
日志服务 SLS,月写入数据量 50GB 1个月
全局流量管理 GTM,标准版 1个月
简介: 离线数仓(六)【ODS 层开发】

前言

       今天开始正式的数仓搭建,所谓 ODS 层的工作就是把我们各种数据源采集发送来的各种类型的数据(Json、tsv类型)映射到 Hive 表中,映射时可以进行一些简单的处理,比如简单的数据清洗,舍弃一些没有必要的字段。

1、ODS 层开发

ODS层的设计要点如下:

(1)ODS层的表结构设计依托于从业务系统同步过来的数据结构(JSON/CSV/TSV)

(2)ODS层要保存全部历史数据,故其压缩格式应选择高压缩比的算法,此处选择gzip。

(3)ODS层表名的命名规范为:ods_表名_单分区增量全量标识(inc/full)

注意:ODS 层的表都是分区表,因为我们每天都会有数据被采集到数仓,所以我们的表是按照日期分区的,每天一张表。

我们当前保存在 HDFS 路径下的数据主要有两类:log 和 db,log 目录下存放是我们从 Flume 传过来的用户行为日志文件,我们已经用 gzip 压缩过了,可以通过下面的命令查看:

hadoop fs -cat /origin_data/gmall/log/topic_log/2020-06-14/* | zcat

       可以看到我们的格式是一个 JSON 格式,那我们要建表的话就得考虑怎么把 JSON 格式的数据映射到我们的 Hive 表中了。

       对于 db 目录下的文件主要有两类:DataX 同步过来的以 "full" 为目录后缀的全量业务数据和 Maxwell 同步过来的首日全量数据和以 "inc" 为目录后缀的增量业务数据。

hadoop fs -cat /origin_data/gmall/db/activity_rule_full/2020-06-14/* | zcat

可以看到 DataX 传输过来的文件是 ".tsv" 文件,我们将来只要拿 "\t" 分隔即可。

hadoop fs -cat /origin_data/gmall/db/comment_info_inc/2020-06-14/* | zcat

可以看到,拿 Maxwell 同步过来的数据和 Flume 一样,都是 JSON 格式的。

1.1、用户行为日志表

1.1.1、ROW FORMAT 和 STORE AS

打开 Hive 官网 -> LanguageManual -> DDL -> JSON

查看 Hive 3.x 版本支持的通过解析 JSON 映射表的语法:

CREATE TABLE my_table(a string, b bigint, ...)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'    -- 声明行的格式
STORED AS TEXTFILE;    -- 声明怎么解析文件

       这里的 ROW FORMATSTORE AS 都是 Hive 建表是所必须指定的,只不过 Hive 帮我们简化了这部分语法:

     ROW FORMAT :用 DELIMITED 关键字表示对文件中的每个字段按照特定分割符进行分割用 SERDE 关键字来指定 Hive 内置的 SERDE 或者 用户自定义的 SERDE。

      STORE AS :用 STORED AS + 简写文件格式来指定 InputFormat 和 OutputFormat ,默比如 TextFileInputFormat 和 TextFileOutputFormat 可以用 STORE AS TEXTFILE 来表示。

Hive SerDe

       Hive 的 ROW FORMAT 的 SERDE 属性指定了 Hive 的序列化器和反序列化器,映射 HDFS 文件时,使用反序列化器进行解析,写出文件时使用序列化器来封装数据。

1.1.2、复杂数据类型

       对于用户行为日志,它的存储格式是 json 格式,那我们就需要把它的每个字段映射到我们 Hive 表中。首先回顾一下 Hive 的三种复杂类型:

  • array
  • 声明:array<string>
  • 取值:arr[0]
  • 构造:array(val1,val2...),split(),collect_set()
  • map
  • 声明:map<string,bigint>
  • 取值:map[key]
  • 构造:map(key1,val1,key2,val2...)
  • struct
  • 声明:struct<id:int,name:strnig>
  • 取值:struct.id
  • 构造:named_struct(name1,val1,name2,val2...)

       对于上面的 common 字段,它虽然是键值对格式,但是我们不能使用 map ,因为 map 的 value 是同一数据类型,而我们这里的 common 字段不同键的值有的是 int 有的是 string。所以我们使用 struct。

       对于 displays 字段,由于它存储的是数组类型,然后数组嵌套键值对,所以我们需要使用 array(struct) 的嵌套类型。

1.1.3、设计日志表

DROP TABLE IF EXISTS ods_log_inc;
CREATE EXTERNAL TABLE ods_log_inc
(
    `common`   STRUCT<ar :STRING,ba :STRING,ch :STRING,is_new :STRING,md :STRING,mid :STRING,os :STRING,uid :STRING,vc
                      :STRING> COMMENT '公共信息',
    `page`     STRUCT<during_time :STRING,item :STRING,item_type :STRING,last_page_id :STRING,page_id
                      :STRING,source_type :STRING> COMMENT '页面信息',
    `actions`  ARRAY<STRUCT<action_id:STRING,item:STRING,item_type:STRING,ts:BIGINT>> COMMENT '动作信息',
    `displays` ARRAY<STRUCT<display_type :STRING,item :STRING,item_type :STRING,`order` :STRING,pos_id
                            :STRING>> COMMENT '曝光信息',
    `start`    STRUCT<entry :STRING,loading_time :BIGINT,open_ad_id :BIGINT,open_ad_ms :BIGINT,open_ad_skip_ms
                      :BIGINT> COMMENT '启动信息',
    `err`      STRUCT<error_code:BIGINT,msg:STRING> COMMENT '错误信息',
    `ts`       BIGINT  COMMENT '时间戳'
) COMMENT '活动信息表'
    PARTITIONED BY (`dt` STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
    LOCATION '/warehouse/gmall/ods/ods_log_inc/';

注意:我们数仓建的基本都是外部表,防止误删数据!

       这里的分区我们指定了一个 String 类型的分区键:如果我们有一个分区 dt='2020-06-14',那么在 HDFS 上就会有一个目录,路径类似于 /user/hive/warehouse/sales/dt=2020-06-14,这个目录就存放了所有 dt 为 '2020-06-14' 的数据。

gzip 和 bzip2 格式的文件可以直接以 textfile 的格式来 load ,而不需要在建表时指定压缩格式(其实就是指定 STORE AS )。

但是其他压缩格式是不行的,比如 LZO 压缩的话,必须指定 STORE AS 的 InputFormat 和 OutputFormat :

CREATE EXTERNAL TABLE IF NOT EXISTS hive_table_name (column_1  datatype_1......column_N datatype_N)
         PARTITIONED BY (partition_col_1 datatype_1 ....col_P  datatype_P)
         ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
         STORED AS INPUTFORMAT  \"com.hadoop.mapred.DeprecatedLzoTextInputFormat\"
                   OUTPUTFORMAT \"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat\";

1.1.4、装载脚本

我们的日志数据每天都要 load 到一张新的分区表中,装载语句还是比较简单的:

load data inpath '/origin_data/gmall/log/topic_log/2020-06-14' into table ods_log_inc partition(dt='2020-06-14')

思考:load 和 location 的区别?

load data会使数据目录发生改变(数据从源路径"移动"到我们表的路径)

location 则不会,而是以此目录作为源数据

注意:load 尽管是移动数据表但是并没有什么开销,因为它只是修改了我们 HDFS 文件块的在 NameNode 中的元数据路径,并没有真的移动数据。

但是我们不能每天都重复编写命令去执行,我们这里直接写一个 shell 脚本:

#!/bin/bash
 
# 定义变量方便修改
APP=gmall
 
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
   do_date=$1
else
   do_date=`date -d "-1 day" +%F`
fi
 
echo ================== 日志日期为 $do_date ==================
sql="
load data inpath '/origin_data/$APP/log/topic_log/$do_date' into table ${APP}.ods_log_inc partition(dt='$do_date');
"
hive -e "$sql"

脚本用法:

hdfs_to_ods_log.sh 2020-06-14

可以看到我们的表被成功 load 。

1.2、业务表(全量 & 增量)

1.2.1、建表分析

因为 DataX 全量采集的结果是 tsv 格式所以我们可以直接对应 MySQL 中这些表格的格式,而 Maxwell 同步过来的增量表我们需要用 json 来序列化和反序列化。

比如全量同步的活动信息表:

DROP TABLE IF EXISTS ods_activity_info_full;
CREATE EXTERNAL TABLE ods_activity_info_full
(
    `id`            STRING COMMENT '活动id',
    `activity_name` STRING COMMENT '活动名称',
    `activity_type` STRING COMMENT '活动类型',
    `activity_desc` STRING COMMENT '活动描述',
    `start_time`    STRING COMMENT '开始时间',
    `end_time`      STRING COMMENT '结束时间',
    `create_time`   STRING COMMENT '创建时间'
) COMMENT '活动信息表'
    PARTITIONED BY (`dt` STRING)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    NULL DEFINED AS ''
    LOCATION '/warehouse/gmall/ods/ods_activity_info_full/';

此外,Maxwell 增量同步过来的数据我们需要额外注意,因为 Maxwell 监听到的有三种类型的数据:insert、update 和 delete。其中 update 类型的数据它的 old 字段保存的是修改的数据字段,而 data 字段保存的是全部内容。

比如首日增量同步的购物车表:

DROP TABLE IF EXISTS ods_cart_info_inc;
CREATE EXTERNAL TABLE ods_cart_info_inc
(
    `type` STRING COMMENT '变动类型',
    `ts`   BIGINT COMMENT '变动时间',
    `data` STRUCT<id :STRING,user_id :STRING,sku_id :STRING,cart_price :DECIMAL(16, 2),sku_num :BIGINT,img_url :STRING,sku_name
                  :STRING,is_checked :STRING,create_time :STRING,operate_time :STRING,is_ordered :STRING,order_time
                  :STRING,source_type :STRING,source_id :STRING> COMMENT '数据',
    `old`  MAP<STRING,STRING> COMMENT '旧值'
) COMMENT '购物车增量表'
    PARTITIONED BY (`dt` STRING)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.JsonSerDe'
    LOCATION '/warehouse/gmall/ods/ods_cart_info_inc/';

       但是我们这里的 database 字段、old 和 table 字段都不需要映射到我们的表中,因为它并没有意义。我们需要保留 type 字段,因为我们要知道用户的操作类型(比如如果这张表是购物车表 cart_info 那 type = 'insert' 就是加入购物车的操作),除此之外,我们还需要保留 old、 ts 和 data 字段,其中 old 我们只能使用 Map<string,string> 来保存,因为 old 字段是发生修改的字段,我们无法确定每个被修改的字段的类型。

注意:对于增量同步的表,我们之前用 Maxwell 做了历史数据首日全量同步,

1.2.2、装载脚本

       上面我们的用户行为日志表因为是一张表,所以我们的参数(日期)只有一个就够了,这里我们的全量和增量表需要指定两个参数(日期和表名)

#!/bin/bash
 
APP=gmall
 
if [ -n "$2" ] ;then
   do_date=$2
else 
   do_date=`date -d '-1 day' +%F`
fi
 
load_data(){
    sql=""
    for i in $*; do
        #判断路径是否存在
        hadoop fs -test -e /origin_data/$APP/db/${i:4}/$do_date
        #路径存在方可装载数据
        if [[ $? = 0 ]]; then
            sql=$sql"load data inpath '/origin_data/$APP/db/${i:4}/$do_date' OVERWRITE into table ${APP}.$i partition(dt='$do_date');"
        fi
    done
    hive -e "$sql"
}
 
case $1 in
    "ods_activity_info_full")
        load_data "ods_activity_info_full"
    ;;
    "ods_activity_rule_full")
        load_data "ods_activity_rule_full"
    ;;
    "ods_base_category1_full")
        load_data "ods_base_category1_full"
    ;;
    "ods_base_category2_full")
        load_data "ods_base_category2_full"
    ;;
    "ods_base_category3_full")
        load_data "ods_base_category3_full"
    ;;
    "ods_base_dic_full")
        load_data "ods_base_dic_full"
    ;;
    "ods_base_province_full")
        load_data "ods_base_province_full"
    ;;
    "ods_base_region_full")
        load_data "ods_base_region_full"
    ;;
    "ods_base_trademark_full")
        load_data "ods_base_trademark_full"
    ;;
    "ods_cart_info_full")
        load_data "ods_cart_info_full"
    ;;
    "ods_coupon_info_full")
        load_data "ods_coupon_info_full"
    ;;
    "ods_sku_attr_value_full")
        load_data "ods_sku_attr_value_full"
    ;;
    "ods_sku_info_full")
        load_data "ods_sku_info_full"
    ;;
    "ods_sku_sale_attr_value_full")
        load_data "ods_sku_sale_attr_value_full"
    ;;
    "ods_spu_info_full")
        load_data "ods_spu_info_full"
    ;;
 
    "ods_cart_info_inc")
        load_data "ods_cart_info_inc"
    ;;
    "ods_comment_info_inc")
        load_data "ods_comment_info_inc"
    ;;
    "ods_coupon_use_inc")
        load_data "ods_coupon_use_inc"
    ;;
    "ods_favor_info_inc")
        load_data "ods_favor_info_inc"
    ;;
    "ods_order_detail_inc")
        load_data "ods_order_detail_inc"
    ;;
    "ods_order_detail_activity_inc")
        load_data "ods_order_detail_activity_inc"
    ;;
    "ods_order_detail_coupon_inc")
        load_data "ods_order_detail_coupon_inc"
    ;;
    "ods_order_info_inc")
        load_data "ods_order_info_inc"
    ;;
    "ods_order_refund_info_inc")
        load_data "ods_order_refund_info_inc"
    ;;
    "ods_order_status_log_inc")
        load_data "ods_order_status_log_inc"
    ;;
    "ods_payment_info_inc")
        load_data "ods_payment_info_inc"
    ;;
    "ods_refund_payment_inc")
        load_data "ods_refund_payment_inc"
    ;;
    "ods_user_info_inc")
        load_data "ods_user_info_inc"
    ;;
    "all")
        load_data "ods_activity_info_full" "ods_activity_rule_full" "ods_base_category1_full" "ods_base_category2_full" "ods_base_category3_full" "ods_base_dic_full" "ods_base_province_full" "ods_base_region_full" "ods_base_trademark_full" "ods_cart_info_full" "ods_coupon_info_full" "ods_sku_attr_value_full" "ods_sku_info_full" "ods_sku_sale_attr_value_full" "ods_spu_info_full" "ods_cart_info_inc" "ods_comment_info_inc" "ods_coupon_use_inc" "ods_favor_info_inc" "ods_order_detail_inc" "ods_order_detail_activity_inc" "ods_order_detail_coupon_inc" "ods_order_info_inc" "ods_order_refund_info_inc" "ods_order_status_log_inc" "ods_payment_info_inc" "ods_refund_payment_inc" "ods_user_info_inc"
    ;;
esac
  • hadoop fs -test -e 用来判断路径是否存在,存在返回 0
  • si{1:4} 返回第 $i 个字符串的4个字符之后的字符串
hdfs_to_ods_db.sh all 2020-06-14

执行完毕,查看 hdfs web 端,共 29 张表,说明我们 load 完毕。

总结

       至此,ODS 搭建完毕。ODS 层其实就是把我们的采集过来的数据(用户行为数据(json 格式)、业务数据(用 DataX 采集过来的数据是 tsv 格式,用 Maxwell 首日全量和之后增量采集过来的是 json 格式)都按照字段映射到我们的 Hive 表中。尤其是用户行为日志,因为业务数据我们还可以参考 MySQL 中的存储格式,但是对于 json 格式的日志数据我们必须自己去设计,而且一些字段还需要进行舍取。        

       Shell 脚本的简单编写还需要复习复习。

DataGrip 字段不显示的配置

Hive 3.3 之后的版本的解析 json 类型的表格时读取不了元数据信息,需要在 hive-site.xml 中配置下面的信息:

<property>
  <name>metastore.storage.schema.reader.impl</name>
  <value>org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader</value>
</property>


相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
3月前
|
存储 数据采集 JavaScript
深入理解数仓开发(一)数据技术篇之日志采集
深入理解数仓开发(一)数据技术篇之日志采集
|
5天前
|
SQL 数据库 HIVE
hive数仓 ods层增量数据导入
根据业务需求,当表数据量超过10万条时采用增量数据导入,否则全量导入。增量导入基于`create_date`和`modify_date`字段进行,并确保时间字段已建立索引以提升查询效率。避免在索引字段上执行函数操作。创建增量表和全量表,并按日期进行分区。首次导入全量数据,后续每日新增或变更数据保存在增量表中,通过全量表与增量表的合并保持数据一致性。
22 13
|
3月前
|
消息中间件 关系型数据库 Kafka
深入理解数仓开发(二)数据技术篇之数据同步
深入理解数仓开发(二)数据技术篇之数据同步
|
24天前
|
消息中间件 监控 关系型数据库
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
|
2月前
|
存储 DataWorks Java
DataWorks产品使用合集之开发离线数仓时,需要多个工作空间的情况有哪些
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
消息中间件 存储 Kafka
Flink 实时数仓(二)【ODS 层开发】
Flink 实时数仓(二)【ODS 层开发】
|
3月前
|
存储 消息中间件 NoSQL
Flink 实时数仓(一)【实时数仓&离线数仓对比】(2)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
3月前
|
存储 消息中间件 Kafka
Flink 实时数仓(一)【实时数仓&离线数仓对比】(1)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
3月前
|
SQL
离线数仓(十)【ADS 层开发】(5)
离线数仓(十)【ADS 层开发】
|
13天前
|
存储 机器学习/深度学习 监控
阿里云 Hologres OLAP 解决方案评测
随着大数据时代的到来,企业面临着海量数据的挑战,如何高效地进行数据分析和决策变得尤为重要。阿里云推出的 Hologres OLAP(在线分析处理)解决方案,旨在为用户提供快速、高效的数据分析能力。本文将深入探讨 Hologres OLAP 的特点、优势以及应用场景,并针对方案的技术细节、部署指导、代码示例和数据分析需求进行评测。
49 7