大数据-162 Apache Kylin 全量增量Cube的构建 Segment 超详细记录 多图

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-162 Apache Kylin 全量增量Cube的构建 Segment 超详细记录 多图

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(已更完)

ClickHouse(已更完)

Kudu(已更完)

Druid(已更完)

Kylin(正在更新…)

章节内容

上节我们完成了如下的内容:


构建Cube 按照日期、区域、产品、渠道

Cube 优化方案

3e32f864238de1a789d8b024468dd0e7_439306727b4d44098dc6719301abde76.png 增量 Cube

在大多数业务场景下,Hive中的数据处于不断增长的状态

为了支持在构建Cube,无需重复处理历史数据,引入增量构建功能

Segment

Kylin将Cube划分为多个Segment(对应就是HBase中的一个表)


一个Cube可能由1个或多个Segment组成,Segment是指定时间范围的Cube,可以理解为Cube的分区

Segment是针对源数据中的某个片段计算出来的Cube数据,代表一段时间内源数据的预计计算结果

每个Segment用起始时间和结束时间来标志

一个Segment的起始时间等于它之前Segment的结束前时间,它的结束时间等于它后面那个Segment的起始时间

同一个Cube下不同的Segment除了背后的源数据不同之外,其他如结构定义、构建过程、优化方法、存储方式等完全相同

Segment示意图

例如:以下为针对某个Cube的Segment

量构建与增量构建

全量构建

在全量构建中:


Cube中存在唯一一个Segment

每Segment没有分割时间的概念,即没有起始时间和结束时间

对于全量构建来说,每当需要更新Cube数据时,它不会区分历史数据和新加入的数据,即在构建时导入并处理所有的数据

增量构建

在增量构建中:


只会导入新Segment指定的时间区间内的原始数据,并只对这部分原始数据进行预计算

相互对比

3d753c97b3c2c4463c82910a9871d01d_8473e09d603142ff9da28625154e6373.png 全量构建与增量构建的Cube查询的方式对比:

全量构建Cube:


查询引擎只需要向存储引擎访问单个Segment所对应的数据,无需进行Segment之间的聚合

为了加强性能,单个Segment的数据也有可能被分片存储到引擎的多个分区上,查询引擎可能仍然需要对单个Segment不同分区的数据进一步聚合

增量构建Cube:


由于不同的时间的数据分布在不同的Segment中,查询引擎需要向存储引擎请求读取各个Segment的数据

增量构建的Cube上的查询会比全量构建的做更多的运行时聚合,通常来说增量构建的Cube上查询会比全量构建的Cube上的查询要慢一些

对于小数据量的Cube,或者经常需要全表更新的Cube,使用全量构建需要更少的运维精力,以少量的重复计算降低生产环境中的维护复杂度。

对于大数据量的Cube,例一个包含较长历史数据的Cube,如果每天更新,那么大量的资源是在用于重复计算,这个情况下可以考虑使用增量构建。


增量构建Cube过程

指定分割时间列

增量构建Cube的定义必须包含一个时间维度,用来分割不同的Segment,这样的维度称为分割时间列(Partition Date Column)。


增量构建过程

在进行增量构建时,将增量部分的起始时间和结束时间作为增量构建请求的一部分提交给Kylin的任务引擎

任务引擎会根据起始时间和结束时间从Hive中抽取相应时间的数据,并对这部分数据做预处理计算

将预计算的结果封装成一个新的Segment,并将相应的信息保存到元数据和存储引擎中,一般来说,增量部分的起始时间等于Cube中最后一个Segment的结束时间

增量Cube构建

步骤:定义数据源 => 定义Model => 定义Cube => 构建Cube


SQL 语句

-- 数据结构类似,只是改为了分区表
drop table wzk_kylin.dw_sales1;
create table wzk_kylin.dw_sales1(
  id string,
  channelId string,
  productId string,
  regionId string,
  amount int,
  price double
)
partitioned by (dt string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

-- 加载数据
load data local inpath "dw_sales20240101_data.txt"
into table wzk_kylin.dw_sales1
partition(dt="2024-01-01");
load data local inpath "dw_sales20240102_data.txt"
into table wzk_kylin.dw_sales1
partition(dt="2024-01-02");
load data local inpath "dw_sales20240103_data.txt"
into table wzk_kylin.dw_sales1
partition(dt="2024-01-03");
load data local inpath "dw_sales20240104_data.txt"
into table wzk_kylin.dw_sales1
partition(dt="2024-01-04");

生成数据

同样,我们先编写一个脚本来生成对应的数据:

import random

# 设置参数
dates = ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04"]
num_records_per_file = 100

# 定义可能的值
channel_ids = ['C001', 'C002', 'C003', 'C004']
product_ids = ['P001', 'P002', 'P003', 'P004']
region_ids = ['R001', 'R002', 'R003', 'R004']

# 生成数据
for dt in dates:
    output_file = f'dw_sales{dt.replace("-", "")}_data.txt'
    
    with open(output_file, 'w') as f:
        for i in range(num_records_per_file):
            record_id = f"{i+1:04d}"
            channel_id = random.choice(channel_ids)
            product_id = random.choice(product_ids)
            region_id = random.choice(region_ids)
            amount = random.randint(1, 100)
            price = round(random.uniform(10.0, 500.0), 2)
            
            line = f"{record_id},{channel_id},{product_id},{region_id},{amount},{price}\n"
            f.write(line)
    
    print(f"{num_records_per_file} records have been written to {output_file}")

print("All data files have been generated.")

执行的结果如下图所示:

上传数据

通过你习惯的方式,将这几个txt上传到服务器上,准备执行:

执行脚本

hive -f kylin_partition.sql
• 1

执行结果如下图:

加载数据源

Load Table From Tree

选择刚才创建的表,wzk_kylin.dw_sales1:

定义Model

增量构建的Cube需要指定分割时间列,例如:将日期分区字段添加到维度列中:

Data Model:New Join Condition,需要配置好几个:

配置成如下的结果:

维度配置如下图所示:

度量选择 AMOUNT 和 PRICE,最后的设置:

定义Cube

填写名字等跳过,维度需要添加 DT、其他都要:

配置完的结果如下图:

度量配置如下:(Bulk Add Measures 快速配置)

剩余的信息都默认填写即可:

构建Cube

接下来构建Cube的时候,进行Build:

选部分的日期,就不选所有数据了:

继续等待构建完毕:

查看Segment

刚才我们构建了


2024-01-01 到 2024-01-02 的数据

我们继续build 2024-01-02 到 2024-01-03

完成后继续build 2024-01-03 到 2024-01-04

分段的进行build的任务,最后我们查看 Segment如下:

2024-01-01 到 2024-01-02 完成之后,我们继续任务:

2024-01-02 到 2024-01-03 完成之后,我们继续任务:

漫长等待,任务都完成之后如下图所示:

查询测试

第一部分:按日期和地区汇总销售数据

-- 第一部分查询:按日期和地区汇总销售数据
SELECT 
    t1.dt,
    t2.regionname,
    SUM(t1.price) AS total_money,
    SUM(t1.amount) AS total_amount,
    MAX(t1.price) AS max_price,
    MIN(t1.amount) AS min_amount
FROM 
    dw_sales1 t1
JOIN 
    dim_region t2 
ON 
    t1.regionid = t2.regionid
GROUP BY 
    t1.dt, 
    t2.regionname
ORDER BY 
    t1.dt;

运行的结果如下图所示:

另一部分:按日期、地区和产品汇总销售数据

-- 第二部分查询:按日期、地区和产品汇总销售数据
SELECT 
    t1.dt,
    t2.regionid,
    t2.regionname,
    t3.productid,
    t3.productname,
    SUM(t1.price) AS total_money,
    SUM(t1.amount) AS total_amount
FROM 
    dw_sales1 t1
INNER JOIN 
    dim_region t2 
ON 
    t1.regionid = t2.regionid
INNER JOIN 
    dim_product t3 
ON 
    t1.productid = t3.productid
GROUP BY 
    t1.dt,
    t2.regionid,
    t2.regionname,
    t3.productid,
    t3.productname
ORDER BY 
    t1.dt,
    t2.regionname,
    t3.productname;

查询结果如下图所示:

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
3月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
137 5
|
3月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
145 4
|
3月前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
152 61
|
3月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
85 1
|
6月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
60 1
|
2月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
394 33
The Past, Present and Future of Apache Flink
|
4月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1022 13
Apache Flink 2.0-preview released
|
4月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
172 3
|
5月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
6月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
385 2

推荐镜像

更多