离线数仓(八)【DWD 层开发】(5)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 离线数仓(八)【DWD 层开发】

离线数仓(八)【DWD 层开发】(4)https://developer.aliyun.com/article/1532419

1.16.2、装载语句

set hive.cbo.enable=false;
insert overwrite table dwd_traffic_display_inc partition(dt='2020-06-14')
select
    province_id,
    brand,
    channel,
    is_new,
    model,
    mid_id,
    operate_system,
    user_id,
    version_code,
    during_time,
    page_item,
    page_item_type,
    last_page_id,
    page_id,
    source_type,
    date_format(from_utc_timestamp(ts,'GMT+8'),'yyyy-MM-dd') date_id,
    date_format(from_utc_timestamp(ts,'GMT+8'),'yyyy-MM-dd HH:mm:ss') display_time,
    display_type,
    display_item,
    display_item_type,
    display_order,
    display_pos_id
from
(
    select
        common.ar area_code,
        common.ba brand,
        common.ch channel,
        common.is_new,
        common.md model,
        common.mid mid_id,
        common.os operate_system,
        common.uid user_id,
        common.vc version_code,
        page.during_time,
        page.item page_item,
        page.item_type page_item_type,
        page.last_page_id,
        page.page_id,
        page.source_type,
        display.display_type,
        display.item display_item,
        display.item_type display_item_type,
        display.`order` display_order,
        display.pos_id display_pos_id,
        ts
    from ods_log_inc lateral view explode(displays) tmp as display
    where dt='2020-06-14'
    and displays is not null
)log
left join
(
    select
        id province_id,
        area_code
    from ods_base_province_full
    where dt='2020-06-14'
)bp
on log.area_code=bp.area_code;

1.17、流量域错误事务事实表

       错误日志有可能是来自于页面日志,也有可能是来自启动日志。

1.17.1、建表语句

DROP TABLE IF EXISTS dwd_traffic_error_inc;
CREATE EXTERNAL TABLE dwd_traffic_error_inc
(
    `province_id`     STRING COMMENT '地区编码',
    `brand`           STRING COMMENT '手机品牌',
    `channel`         STRING COMMENT '渠道',
    `is_new`          STRING COMMENT '是否首次启动',
    `model`           STRING COMMENT '手机型号',
    `mid_id`          STRING COMMENT '设备id',
    `operate_system`  STRING COMMENT '操作系统',
    `user_id`         STRING COMMENT '会员id',
    `version_code`    STRING COMMENT 'app版本号',
    `page_item`       STRING COMMENT '目标id ',
    `page_item_type`  STRING COMMENT '目标类型',
    `last_page_id`    STRING COMMENT '上页类型',
    `page_id`         STRING COMMENT '页面ID ',
    `source_type`     STRING COMMENT '来源类型',
    `entry`           STRING COMMENT 'icon手机图标  notice 通知',
    `loading_time`    STRING COMMENT '启动加载时间',
    `open_ad_id`      STRING COMMENT '广告页ID ',
    `open_ad_ms`      STRING COMMENT '广告总共播放时间',
    `open_ad_skip_ms` STRING COMMENT '用户跳过广告时点',
    `actions`         ARRAY<STRUCT<action_id:STRING,item:STRING,item_type:STRING,ts:BIGINT>> COMMENT '动作信息',
    `displays`        ARRAY<STRUCT<display_type :STRING,item :STRING,item_type :STRING,`order` :STRING,pos_id
                                   :STRING>> COMMENT '曝光信息',
    `date_id`         STRING COMMENT '日期id',
    `error_time`      STRING COMMENT '错误时间',
    `error_code`      STRING COMMENT '错误码',
    `error_msg`       STRING COMMENT '错误信息'
) COMMENT '错误日志表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dwd/dwd_traffic_error_inc'
    TBLPROPERTIES ('orc.compress' = 'snappy');

       这里的 actions 和 displays 字段并没有使用炸裂函数,因为这样会破坏我们的粒度,如果将来需要用到这个数据的时候就需要炸裂函数了。

1.17.2、装载语句

set hive.cbo.enable=false;
set hive.execution.engine=mr;
insert overwrite table dwd_traffic_error_inc partition(dt='2020-06-14')
select
    province_id,
    brand,
    channel,
    is_new,
    model,
    mid_id,
    operate_system,
    user_id,
    version_code,
    page_item,
    page_item_type,
    last_page_id,
    page_id,
    source_type,
    entry,
    loading_time,
    open_ad_id,
    open_ad_ms,
    open_ad_skip_ms,
    actions,
    displays,
    date_format(from_utc_timestamp(ts,'GMT+8'),'yyyy-MM-dd') date_id,
    date_format(from_utc_timestamp(ts,'GMT+8'),'yyyy-MM-dd HH:mm:ss') error_time,
    error_code,
    error_msg
from
(
    select
        common.ar area_code,
        common.ba brand,
        common.ch channel,
        common.is_new,
        common.md model,
        common.mid mid_id,
        common.os operate_system,
        common.uid user_id,
        common.vc version_code,
        page.during_time,
        page.item page_item,
        page.item_type page_item_type,
        page.last_page_id,
        page.page_id,
        page.source_type,
        `start`.entry,
        `start`.loading_time,
        `start`.open_ad_id,
        `start`.open_ad_ms,
        `start`.open_ad_skip_ms,
        actions,
        displays,
        err.error_code,
        err.msg error_msg,
        ts
    from ods_log_inc
    where dt='2020-06-14'
    and err is not null
)log
join
(
    select
        id province_id,
        area_code
    from ods_base_province_full
    where dt='2020-06-14'
)bp
on log.area_code=bp.area_code;
set hive.execution.engine=spark;

注意:这里的 actions、displays 对应到 Java 中是 List 类型,在 Hive On Spark 中报错 不期望的类型的列,这是 Hive On Spark 的一个 bug,我们需要通过切换 Hvie 引擎来解决。

1.18、用户域用户注册事务事实表

1.18.1、建表语句

DROP TABLE IF EXISTS dwd_user_register_inc;
CREATE EXTERNAL TABLE dwd_user_register_inc
(
    `user_id`        STRING COMMENT '用户ID',
    `date_id`        STRING COMMENT '日期ID',
    `create_time`    STRING COMMENT '注册时间',
    `channel`        STRING COMMENT '应用下载渠道',
    `province_id`    STRING COMMENT '省份id',
    `version_code`   STRING COMMENT '应用版本',
    `mid_id`         STRING COMMENT '设备id',
    `brand`          STRING COMMENT '设备品牌',
    `model`          STRING COMMENT '设备型号',
    `operate_system` STRING COMMENT '设备操作系统'
) COMMENT '用户域用户注册事务事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dwd/dwd_user_register_inc/'
    TBLPROPERTIES ("orc.compress" = "snappy");

1.18.2、首日装载

       注册成功会影响哪些表:显然用户信息表会插入一条记录,日志中会多一条页面为注册页面并且带有用户id的记日志录。

       但是如果只用 user_info 去设计这张注册事实表维度信息太少了(只有用户和时间维度),我们希望尽可能丰富维度属性,所以这里我们会从地区表、日志表去获取更多的维度信息。

       这里我们的表当然是以表为主,而不是以日志为主,因为业务数据更加靠谱(比如业务数据库有事务保证,比如涉及到金额的操作,我们绝不会用日志去传输)

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_user_register_inc partition(dt)
select
    ui.user_id,
    date_format(create_time,'yyyy-MM-dd') date_id,
    create_time,
    channel,
    province_id,
    version_code,
    mid_id,
    brand,
    model,
    operate_system,
    date_format(create_time,'yyyy-MM-dd')
from
(
    select
        data.id user_id,
        data.create_time
    from ods_user_info_inc
    where dt='2020-06-14'
    and type='bootstrap-insert'
)ui
left join
(
    select
        common.ar area_code,
        common.ba brand,
        common.ch channel,
        common.md model,
        common.mid mid_id,
        common.os operate_system,
        common.uid user_id,
        common.vc version_code
    from ods_log_inc
    where dt='2020-06-14'
    and page.page_id='register'
    and common.uid is not null
)log
on ui.user_id=log.user_id
left join
(
    select
        id province_id,
        area_code
    from ods_base_province_full
    where dt='2020-06-14'
)bp
on log.area_code=bp.area_code;

1.18.3、每日装载

insert overwrite table dwd_user_register_inc partition(dt='2020-06-15')
select
    ui.user_id,
    date_format(create_time,'yyyy-MM-dd') date_id,
    create_time,
    channel,
    province_id,
    version_code,
    mid_id,
    brand,
    model,
    operate_system
from
(
    select
        data.id user_id,
        data.create_time
    from ods_user_info_inc
    where dt='2020-06-15'
    and type='insert'
)ui
left join
(
    select
        common.ar area_code,
        common.ba brand,
        common.ch channel,
        common.md model,
        common.mid mid_id,
        common.os operate_system,
        common.uid user_id,
        common.vc version_code
    from ods_log_inc
    where dt='2020-06-15'
    and page.page_id='register'
    and common.uid is not null
)log
on ui.user_id=log.user_id
left join
(
    select
        id province_id,
        area_code
    from ods_base_province_full
    where dt='2020-06-15'
)bp
on log.area_code=bp.area_code;

1.19、用户域用户登录事务事实表

1.19.1、建表语句

DROP TABLE IF EXISTS dwd_user_login_inc;
CREATE EXTERNAL TABLE dwd_user_login_inc
(
    `user_id`        STRING COMMENT '用户ID',
    `date_id`        STRING COMMENT '日期ID',
    `login_time`     STRING COMMENT '登录时间',
    `channel`        STRING COMMENT '应用下载渠道',
    `province_id`    STRING COMMENT '省份id',
    `version_code`   STRING COMMENT '应用版本',
    `mid_id`         STRING COMMENT '设备id',
    `brand`          STRING COMMENT '设备品牌',
    `model`          STRING COMMENT '设备型号',
    `operate_system` STRING COMMENT '设备操作系统'
) COMMENT '用户域用户登录事务事实表'
    PARTITIONED BY (`dt` STRING)
    STORED AS ORC
    LOCATION '/warehouse/gmall/dwd/dwd_user_login_inc/'
    TBLPROPERTIES ("orc.compress" = "snappy");

1.19.2、数据装载

首先,数据来自于日志,日志没有历史数据,所以我们并不需要区分首日次日。

我们的登录场景根据日志中有没有 user_id 可以大致分我三类:从始至终没有登录、浏览到一半登录,打开网页时后台自动登录。

insert overwrite table dwd_user_login_inc partition(dt='2020-06-14')
select
    user_id,
    date_format(from_utc_timestamp(ts,'GMT+8'),'yyyy-MM-dd') date_id,
    date_format(from_utc_timestamp(ts,'GMT+8'),'yyyy-MM-dd HH:mm:ss') login_time,
    channel,
    province_id,
    version_code,
    mid_id,
    brand,
    model,
    operate_system
from
(
    select
        user_id,
        channel,
        area_code,
        version_code,
        mid_id,
        brand,
        model,
        operate_system,
        ts
    from
    (
        select
            user_id,
            channel,
            area_code,
            version_code,
            mid_id,
            brand,
            model,
            operate_system,
            ts,
            row_number() over (partition by session_id order by ts) rn
        from
        (
            select
                user_id,
                channel,
                area_code,
                version_code,
                mid_id,
                brand,
                model,
                operate_system,
                ts,
                concat(mid_id,'-',last_value(session_start_point,true) over(partition by mid_id order by ts)) session_id
            from
            (
                select
                    common.uid user_id,
                    common.ch channel,
                    common.ar area_code,
                    common.vc version_code,
                    common.mid mid_id,
                    common.ba brand,
                    common.md model,
                    common.os operate_system,
                    ts,
                    if(page.last_page_id is null,ts,null) session_start_point
                from ods_log_inc
                where dt='2020-06-14'
                and page is not null
            )t1
        )t2
        where user_id is not null
    )t3
    where rn=1
)t4
left join
(
    select
        id province_id,
        area_code
    from ods_base_province_full
    where dt='2020-06-14'
)bp
on t4.area_code=bp.area_code;

脚本省略,注意:严格模式最好直接配到配置文件里去:

<property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
</property>

总结

       至此,DWD 层搭建完毕,DWD 层一般都是和 DIM 层配置使用的,一个负责提供业务过程相关行为信息,一个负责提供该业务过程的维度信息。

       距离 DIM 层开发过去了整整两周,这两部分内容太重要了,所以学的很慢,但还是要回头慢慢再消化消化。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
2月前
|
存储 数据采集 JavaScript
深入理解数仓开发(一)数据技术篇之日志采集
深入理解数仓开发(一)数据技术篇之日志采集
|
2月前
|
消息中间件 关系型数据库 Kafka
深入理解数仓开发(二)数据技术篇之数据同步
深入理解数仓开发(二)数据技术篇之数据同步
|
4天前
|
存储 消息中间件 大数据
什么是实时数仓,与离线数仓的区别是什么?
什么是实时数仓,与离线数仓的区别是什么?
|
1月前
|
存储 DataWorks Java
DataWorks产品使用合集之开发离线数仓时,需要多个工作空间的情况有哪些
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
消息中间件 存储 Kafka
Flink 实时数仓(二)【ODS 层开发】
Flink 实时数仓(二)【ODS 层开发】
|
2月前
|
存储 消息中间件 NoSQL
Flink 实时数仓(一)【实时数仓&离线数仓对比】(2)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
23天前
|
存储 数据挖掘 BI
数据仓库深度解析与实时数仓应用案例探析
随着数据量的不断增长和数据应用的广泛深入,数据治理和隐私保护将成为数据仓库建设的重要议题。企业需要建立完善的数据治理体系,确保数据的准确性、一致性和完整性;同时加强隐私保护机制建设,确保敏感数据的安全性和合规性。
128 55
|
6天前
|
消息中间件 存储 大数据
大数据-数据仓库-实时数仓架构分析
大数据-数据仓库-实时数仓架构分析
22 1
|
23天前
|
存储 消息中间件 数据挖掘
数据仓库的深度探索与实时数仓应用案例解析
大数据技术的发展,使得数据仓库能够支持大量和复杂数据类型(如文本、图像、视频、音频等)。数据湖作为一种新的数据存储架构,强调原始数据的全面保留和灵活访问,与数据仓库形成互补,共同支持企业的数据分析需求。
|
2月前
|
Cloud Native 数据管理 OLAP
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
389 2
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区

热门文章

最新文章