大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-163 Apache Kylin 全量增量Cube的构建 手动触发合并 JDBC 操作 Scala

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(已更完)

ClickHouse(已更完)

Kudu(已更完)

Druid(已更完)

Kylin(正在更新…)

章节内容

上节我们完成了如下的内容:


全量增量Cube的构建

Segment

ce9d9bef8dae434f654ba669492e096a_f8e8d74830644f3585cf57fc4638fd06.png 基本流程

在 Apache Kylin 中,手动触发 Segment 合并的步骤如下:


选择要合并的 Cube 和 Segments: 进入 Kylin Web UI,选择你要操作的 Cube,进入该 Cube 的详情页面。在“Segments”标签页下,可以看到当前 Cube 的所有 Segments。选择你希望合并的 Segments。


合并 Segments: 点击页面上的 “Merge Segment” 按钮。通常情况下,Kylin 会自动计算可以合并的 Segments。如果你想手动控制合并的 Segments,可以在弹出的对话框中手动选择你想合并的 Segments。


配置合并任务: 配置合并任务的参数,如目标时间范围等。Kylin 会根据你选择的 Segments 的范围自动填充一些默认的值。你可以根据需求调整这些参数。


启动合并任务: 完成配置后,点击 “Submit” 按钮。Kylin 将会创建一个新的合并任务(Job),该任务将在后台执行。你可以在 “Job” 页面查看任务的执行状态。


监控任务状态: 在 “Job” 页面,你可以查看合并任务的日志和状态。如果任务执行成功,你会看到新的 Segment 出现在 Segments 列表中,表示合并已经完成。


如果合并成功,新的合并后 Segment 会替代原来的多个 Segments,而旧的 Segments 将被 Kylin 自动清理。


需要注意的是,手动合并的操作可能会占用大量资源,因此在高负载时需要谨慎操作,并在合适的时间段执行合并任务。


手动触发合并Segment

Kylin提供了一种简单的机制用于控制Cube中Segment的数量:合并Segment,在WebGUI中选中需要进行Segments合并的Cube。


单击Action => Merge

我们刚才分阶段进行了任务的Build操作,

01-01、01-02、01-03、01-04 的任务,我们可以使用 Merge 来进行合并:


选中需要合并的Segment,可以同时合并多个Segment,但这些Segment必须是连续的,单击提交系统会提交一个类型为 MERGE 的构建任务,这里可以选择时间阶段,我选择的是 01-03到01-04:

提交任务,可以看到是一个 Merge任务,看名字:【MERGE】,等待合并完毕:

合并完毕的结果如下图:

注意事项

在MERGE构建结束之前,所有选中用来合并的Segment仍然处于可用的状态

在MERGE类型的构建完成之前,系统将不允许提交这个Cube上任何类型的其他构建任务

当MERGE构建结束的时候,系统将选中合并的Segment替换为新的Segment,而被替换下的Segment等待将被垃圾回收和清理,以节省系统资源

删除Segment

使用WebUI删除Cube的Segment,

这里选择 Disable 就可以删除Segment了:

Disable之后,可以看到下面的:DeleteSegment操作,就可以删除指定的Segment了:

自动合并

手动维护Segment很繁琐,人工成本高,Kylin中是可以支持自动合并Segment。

在Cube Designer的 Refresh Settings的页面中有:


Auto Merge Thresholds

Retention Thresholds

Refresh Settings的页面:

03ff1a5dc647869ddc589a46b904d0c6_e1fd41b700184926bd1bc403e85b0fbd.png 两个设置项可以用来帮助管理Segment碎片,这两项设置搭配使用这两项设置可以大大减少对Segment进行管理的麻烦。


Auto Merge Thresholds

允许用户设置几个层级的时间阈值,层级越靠后,时间阈值越大

每当Cube中有新的Segment状态变为READY的时候,就会自动触发一次系统自动合并

合并策略

尝试最大一级的时间阈值,例如:针对(7天、28天)层级的日志,先检查能够将连续的若干个Segment合并成为一个超过28天的大Segment

如果有个别的Segment的事件长度本身已经超过28天,系统会跳过Segment

如果满足条件的连续Segment还不能够累积超过28天,那么系统会使用下一个层级的时间戳重复寻找

案例1 理解Kylin自动合并策略

假设自动合并阈值设置为7天、28天

如果现在有A-H 8个连续的Segment,它们的时间长度为28天(A)、7天(B)、1天(C)、一天(D)、一天(E)、一天(F)、一天(G)、一天(H)

此时,第9个Segment加入,时间长度为1天

自动合并的策略为:


Kylin判断时能够将连续的Segment合并到28天这个阈值,由于Segment A已经超过28天,会被排除。

剩下的连续Segment,所有时间加一起 B+C+D+E+F+G+H+I < 28天,无法满足28天的阈值,则开始尝试7天的阈值

跳过 A(28)、B(7)均超过7天,排除

剩下的连续Segment,所有时间加在一起 C+D+E+F+G+H+I 达到7天的阈值,触发合并,提交Merge任务,并构建一个SegmentX(7天)

合并后,Segment为:A(28天)、B(7天)、X(7天)

连续触发检查,A(28天)跳过,B+X(7+7=14)< 28天,不满足第一阈值,重新使用第二阈值触发

跳过B、X尝试终止

案例2 配置自动合并4天的Segment

选中Model,选择Edit进行编辑:

直接到Refresh Setting选项卡,将选项修改为,4天:

后续将自动进行Segment的构建。


配置保留的Segment

自动合并是将多个Segment合并为一个Segment,以达到清理碎片的目的,保留Segment则是及时清理不再使用的Segment。

在很多场景中,只会对过去一段时间内的数据进行查询,例如:


对于某个只显示过去1年数据的报表

支持它的Cube其实只需要保留过去一年类的Segment即可

由于数据在Hive中已经存在备份,则不需在Kylin中备份超过一年的类似数据

可以将Retention Threshold设置为365,每当有新的Segment状态变为READY的时候,系统会检查每一个Segment。如果它的结束时间距离最晚的一个Segment的结束时间已经大于等于RetentionThreshold,那么这个Segment将视为无需保留,系统会自动从Cube中删除这个Segment。


保留策略示意图如下所示:

e949052b6cd4fa7bda1195c24a0f3608_6961f28fb59b45998115732af0024796.png 使用JDBC连接操作Kylin

简单介绍

要将数据以可视化方式展示出来,需要使用Kylin的JDBC方式连接执行SQL,获取Kylin的执行结果

使用Kylin的JDBC与JDBC操作MySQL一致

业务需求

通过JDBC的方式,查询按照日期、区域、产品维度统计订单总额/总数量结果


开发步骤

添加依赖

<dependency>
  <groupId>org.apache.kylin</groupId>
  <artifactId>kylin-jdbc</artifactId>
  <version>3.1.1</version>
</dependency>

实现规划

  • 创建Connection连接对象
  • 构建SQL语句
  • 创建Statement对象,并执行executeQuery
  • 打印结果

编写代码

我这里用Scala实现了,Java也差不多

package icu.wzk.kylin

import java.sql.DriverManager

object KylinJdbcTest {

  def main(args: Array[String]): Unit = {
    // 创建连接对象
    val connection = DriverManager.getConnection("jdbc:kylin://h122.wzk.icu:7070/wzk_test_kylin", "ADMIN", "KYLIN")
    // 创建Statement
    val statement = connection.createStatement();
    // 构建SQL语句
    var sql =
      """
        |select
        | t1.dt,
        | t2.regionid,
        | t2.regionname,
        | t3.productid,
        | t3.productname,
        | sum(t1.price) as total_money,
        | sum(t1.amount) as total_amount
        |from
        | dw_sales1 t1
        |inner join dim_region t2
        |on t1.regionid = t2.regionid
        |inner join dim_product t3
        |on t1.productid = t3.productid
        |group by
        | t1.dt,
        | t2.regionid,
        | t2.regionname,
        | t3.productid,
        | t3.productname
        |order by
        | t1.dt,
        | t2.regionname,
        | t3.productname
        |""".stripMargin
    val resultSet = statement.executeQuery(sql)
    println("dt region product_name total_money total_amount")
    while (resultSet.next()) {
      // 获取时间
      val dt = resultSet.getString("dt")
      // 获取区域名称
      val regionName = resultSet.getString("regionname")
      // 获取产品名称
      val productName = resultSet.getString("productname")
      // 获取累计金额
      val totalMoney = resultSet.getDouble("total_money")
      // 获取累计数量
      val totalAmount = resultSet.getDouble("total_amount")
      println(f"$dt $regionName $productName $totalMoney $totalAmount")
    }
    connection.close()
  }

}

测试运行

我们运行代码,可以看到如下的运行结果:

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
107 5
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
112 4
|
3月前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
102 5
|
2月前
|
存储 数据挖掘 数据处理
巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践
随着数据湖技术的发展,企业纷纷探索其优化潜力。本文分享了巴别时代使用 Apache Paimon 构建 Streaming Lakehouse 的实践。Paimon 支持流式和批处理,提供高性能、统一的数据访问和流批一体的优势。通过示例代码和实践经验,展示了如何高效处理实时数据,解决了数据一致性和故障恢复等挑战。
134 61
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
63 1
|
28天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
316 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
908 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
121 3
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

推荐镜像

更多