大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala

简介: 大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(已更完)

ClickHouse(已更完)

Kudu(已更完)

Druid(已更完)

Kylin(正在更新…)

章节内容

上节我们完成了如下的内容:


全量增量Cube的构建

Segment

ce9d9bef8dae434f654ba669492e096a_f8e8d74830644f3585cf57fc4638fd06.png 基本流程

在 Apache Kylin 中,手动触发 Segment 合并的步骤如下:


选择要合并的 Cube 和 Segments: 进入 Kylin Web UI,选择你要操作的 Cube,进入该 Cube 的详情页面。在“Segments”标签页下,可以看到当前 Cube 的所有 Segments。选择你希望合并的 Segments。


合并 Segments: 点击页面上的 “Merge Segment” 按钮。通常情况下,Kylin 会自动计算可以合并的 Segments。如果你想手动控制合并的 Segments,可以在弹出的对话框中手动选择你想合并的 Segments。


配置合并任务: 配置合并任务的参数,如目标时间范围等。Kylin 会根据你选择的 Segments 的范围自动填充一些默认的值。你可以根据需求调整这些参数。


启动合并任务: 完成配置后,点击 “Submit” 按钮。Kylin 将会创建一个新的合并任务(Job),该任务将在后台执行。你可以在 “Job” 页面查看任务的执行状态。


监控任务状态: 在 “Job” 页面,你可以查看合并任务的日志和状态。如果任务执行成功,你会看到新的 Segment 出现在 Segments 列表中,表示合并已经完成。


如果合并成功,新的合并后 Segment 会替代原来的多个 Segments,而旧的 Segments 将被 Kylin 自动清理。


需要注意的是,手动合并的操作可能会占用大量资源,因此在高负载时需要谨慎操作,并在合适的时间段执行合并任务。


手动触发合并Segment

Kylin提供了一种简单的机制用于控制Cube中Segment的数量:合并Segment,在WebGUI中选中需要进行Segments合并的Cube。


单击Action => Merge

我们刚才分阶段进行了任务的Build操作,

01-01、01-02、01-03、01-04 的任务,我们可以使用 Merge 来进行合并:


选中需要合并的Segment,可以同时合并多个Segment,但这些Segment必须是连续的,单击提交系统会提交一个类型为 MERGE 的构建任务,这里可以选择时间阶段,我选择的是 01-03到01-04:

提交任务,可以看到是一个 Merge任务,看名字:【MERGE】,等待合并完毕:

合并完毕的结果如下图:

注意事项

在MERGE构建结束之前,所有选中用来合并的Segment仍然处于可用的状态

在MERGE类型的构建完成之前,系统将不允许提交这个Cube上任何类型的其他构建任务

当MERGE构建结束的时候,系统将选中合并的Segment替换为新的Segment,而被替换下的Segment等待将被垃圾回收和清理,以节省系统资源

删除Segment

使用WebUI删除Cube的Segment,

这里选择 Disable 就可以删除Segment了:

Disable之后,可以看到下面的:DeleteSegment操作,就可以删除指定的Segment了:

自动合并

手动维护Segment很繁琐,人工成本高,Kylin中是可以支持自动合并Segment。

在Cube Designer的 Refresh Settings的页面中有:


Auto Merge Thresholds

Retention Thresholds

Refresh Settings的页面:

03ff1a5dc647869ddc589a46b904d0c6_e1fd41b700184926bd1bc403e85b0fbd.png 两个设置项可以用来帮助管理Segment碎片,这两项设置搭配使用这两项设置可以大大减少对Segment进行管理的麻烦。


Auto Merge Thresholds

允许用户设置几个层级的时间阈值,层级越靠后,时间阈值越大

每当Cube中有新的Segment状态变为READY的时候,就会自动触发一次系统自动合并

合并策略

尝试最大一级的时间阈值,例如:针对(7天、28天)层级的日志,先检查能够将连续的若干个Segment合并成为一个超过28天的大Segment

如果有个别的Segment的事件长度本身已经超过28天,系统会跳过Segment

如果满足条件的连续Segment还不能够累积超过28天,那么系统会使用下一个层级的时间戳重复寻找

案例1 理解Kylin自动合并策略

假设自动合并阈值设置为7天、28天

如果现在有A-H 8个连续的Segment,它们的时间长度为28天(A)、7天(B)、1天(C)、一天(D)、一天(E)、一天(F)、一天(G)、一天(H)

此时,第9个Segment加入,时间长度为1天

自动合并的策略为:


Kylin判断时能够将连续的Segment合并到28天这个阈值,由于Segment A已经超过28天,会被排除。

剩下的连续Segment,所有时间加一起 B+C+D+E+F+G+H+I < 28天,无法满足28天的阈值,则开始尝试7天的阈值

跳过 A(28)、B(7)均超过7天,排除

剩下的连续Segment,所有时间加在一起 C+D+E+F+G+H+I 达到7天的阈值,触发合并,提交Merge任务,并构建一个SegmentX(7天)

合并后,Segment为:A(28天)、B(7天)、X(7天)

连续触发检查,A(28天)跳过,B+X(7+7=14)< 28天,不满足第一阈值,重新使用第二阈值触发

跳过B、X尝试终止

案例2 配置自动合并4天的Segment

选中Model,选择Edit进行编辑:

直接到Refresh Setting选项卡,将选项修改为,4天:

后续将自动进行Segment的构建。


配置保留的Segment

自动合并是将多个Segment合并为一个Segment,以达到清理碎片的目的,保留Segment则是及时清理不再使用的Segment。

在很多场景中,只会对过去一段时间内的数据进行查询,例如:


对于某个只显示过去1年数据的报表

支持它的Cube其实只需要保留过去一年类的Segment即可

由于数据在Hive中已经存在备份,则不需在Kylin中备份超过一年的类似数据

可以将Retention Threshold设置为365,每当有新的Segment状态变为READY的时候,系统会检查每一个Segment。如果它的结束时间距离最晚的一个Segment的结束时间已经大于等于RetentionThreshold,那么这个Segment将视为无需保留,系统会自动从Cube中删除这个Segment。


保留策略示意图如下所示:

e949052b6cd4fa7bda1195c24a0f3608_6961f28fb59b45998115732af0024796.png 使用JDBC连接操作Kylin

简单介绍

要将数据以可视化方式展示出来,需要使用Kylin的JDBC方式连接执行SQL,获取Kylin的执行结果

使用Kylin的JDBC与JDBC操作MySQL一致

业务需求

通过JDBC的方式,查询按照日期、区域、产品维度统计订单总额/总数量结果


开发步骤

添加依赖

<dependency>
  <groupId>org.apache.kylin</groupId>
  <artifactId>kylin-jdbc</artifactId>
  <version>3.1.1</version>
</dependency>

实现规划

  • 创建Connection连接对象
  • 构建SQL语句
  • 创建Statement对象,并执行executeQuery
  • 打印结果

编写代码

我这里用Scala实现了,Java也差不多

package icu.wzk.kylin

import java.sql.DriverManager

object KylinJdbcTest {

  def main(args: Array[String]): Unit = {
    // 创建连接对象
    val connection = DriverManager.getConnection("jdbc:kylin://h122.wzk.icu:7070/wzk_test_kylin", "ADMIN", "KYLIN")
    // 创建Statement
    val statement = connection.createStatement();
    // 构建SQL语句
    var sql =
      """
        |select
        | t1.dt,
        | t2.regionid,
        | t2.regionname,
        | t3.productid,
        | t3.productname,
        | sum(t1.price) as total_money,
        | sum(t1.amount) as total_amount
        |from
        | dw_sales1 t1
        |inner join dim_region t2
        |on t1.regionid = t2.regionid
        |inner join dim_product t3
        |on t1.productid = t3.productid
        |group by
        | t1.dt,
        | t2.regionid,
        | t2.regionname,
        | t3.productid,
        | t3.productname
        |order by
        | t1.dt,
        | t2.regionname,
        | t3.productname
        |""".stripMargin
    val resultSet = statement.executeQuery(sql)
    println("dt region product_name total_money total_amount")
    while (resultSet.next()) {
      // 获取时间
      val dt = resultSet.getString("dt")
      // 获取区域名称
      val regionName = resultSet.getString("regionname")
      // 获取产品名称
      val productName = resultSet.getString("productname")
      // 获取累计金额
      val totalMoney = resultSet.getDouble("total_money")
      // 获取累计数量
      val totalAmount = resultSet.getDouble("total_amount")
      println(f"$dt $regionName $productName $totalMoney $totalAmount")
    }
    connection.close()
  }

}

测试运行

我们运行代码,可以看到如下的运行结果:

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
8月前
|
机器学习/深度学习 算法 大数据
构建数据中台,为什么“湖仓一体”成了大厂标配?
在大数据时代,数据湖与数据仓库各具优势,但单一架构难以应对复杂业务需求。湖仓一体通过融合数据湖的灵活性与数据仓的规范性,实现数据分层治理、统一调度,既能承载海量多源数据,又能支撑高效分析决策,成为企业构建数据中台、推动智能化转型的关键路径。
|
11月前
|
SQL 分布式计算 大数据
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
本文深入介绍 Hive 与大数据融合构建强大数据仓库的实战指南。涵盖 Hive 简介、优势、安装配置、数据处理、性能优化及安全管理等内容,并通过互联网广告和物流行业案例分析,展示其实际应用。具有专业性、可操作性和参考价值。
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
|
9月前
|
存储 SQL 分布式计算
MaxCompute x 聚水潭:基于近实时数仓解决方案构建统一增全量一体化数据链路
聚水潭作为中国领先的电商SaaS ERP服务商,致力于为88,400+客户提供全链路数字化解决方案。其核心ERP产品助力企业实现数据驱动的智能决策。为应对业务扩展带来的数据处理挑战,聚水潭采用MaxCompute近实时数仓Delta Table方案,有效提升数据新鲜度和计算效率,提效比例超200%,资源消耗显著降低。未来,聚水潭将进一步优化数据链路,结合MaxQA实现实时分析,赋能商家快速响应市场变化。
398 0
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
1710 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
919 2
|
存储 分布式计算 大数据
基于阿里云大数据平台的实时数据湖构建与数据分析实战
在大数据时代,数据湖作为集中存储和处理海量数据的架构,成为企业数据管理的核心。阿里云提供包括MaxCompute、DataWorks、E-MapReduce等在内的完整大数据平台,支持从数据采集、存储、处理到分析的全流程。本文通过电商平台案例,展示如何基于阿里云构建实时数据湖,实现数据价值挖掘。平台优势包括全托管服务、高扩展性、丰富的生态集成和强大的数据分析工具。
|
6月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
1098 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
545 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式