Iceberg实战踩坑指南(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Iceberg实战踩坑指南

 4  DataFrame 操作

4.1.配置 Resources

1)将自己 hadoop 集群的客户端配置文件复制到 resource 下,方便 local 模式调试

4.2配置 pom.xml

1)配置相关依赖

<properties>
    <spark.version>3.0.1</spark.version>
    <scala.version>2.12.10</scala.version>
    <log4j.version>1.2.17</log4j.version>
    <slf4j.version>1.7.22</slf4j.version>
    <iceberg.version>0.11.1</iceberg.version>
</properties>
<dependencies>
    <!-- Spark 的依赖引入 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- 引入Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-spark3-runtime</artifactId>
        <version>${iceberg.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-spark3-extensions</artifactId>
        <version>${iceberg.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.46</version>
    </dependency>
</dependencies>
<build>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.1</version>
        <executions>
            <execution>
                <id>compile-scala</id>
                <goals>
                    <goal>add-source</goal>
                    <goal>compile</goal>
                </goals>
            </execution>
            <execution>
                <id>test-compile-scala</id>
                <goals>
                    <goal>add-source</goal>
                    <goal>testCompile</goal>
               </goals>
           </execution>
        </executions>
    </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
            </manifest>
          </archive>
          <descriptorRefs>
             <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
     </plugin>
   </plugins>
</build>

4.3读取表

package com.atguigu.iceberg.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object TableOperations {
  def main(args: Array[String]): Unit = { val sparkConf = new SparkConf()
   .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
   .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
   .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse")
   .set("spark.sql.catalog.catalog-name.type", "hadoop")
   .set("spark.sql.catalog.catalog-name.default-namespace", "default")
   .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
   .set("spark.sql.session.timeZone", "GMT+8")
   .setMaster("local[*]").setAppName("table_operations")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
    readTale(sparkSession)
}
/**
*读取iceberg 的表
*@param sparkSession
*/
  def readTale(sparkSession: SparkSession) = {
     //三种方式sparkSession.table("hadoop_prod.db.testA").show()
     sparkSession.read.format("iceberg").load("hadoop_prod.db.testA").show() 
     sparkSession.read.format("iceberg").load("/hive/warehouse/db/testA").show()// 路径到表就行,不要到具体文件
  }
}

4.4读取快照

 def readSnapShots(sparkSession: SparkSession) = {
  //根据查询 hadoop_prod.db.testA.snapshots 快照表可以知道快照时间和快照id
  //根据时间戳读取,必须是时间戳 不能使用格式化后的时间
  sparkSession.read
  .option("as-of-timestamp", "1624961454000") //毫秒时间戳,查询比该值时间更早的快照
  .format("iceberg")
  .load("hadoop_prod.db.testA").show()
  //根据快照 id 查询
  sparkSession.read
     .option("snapshot-id", "9054909815461789342")
     .format("iceberg")
     .load("hadoop_prod.db.testA").show()
  }
}

4.5写入表

4.5.1写入数据并创建表

1)编写代码执行

case class Student(id: Int, name: String, age: Int, dt: String)
  def writeAndCreateTable(sparkSession: SparkSession) = { 
    import  sparkSession.implicits._
    import org.apache.spark.sql.functions._
    val data = sparkSession.createDataset[Student](Array(Student(1001, " 张 三 ", 18, "2021-06-28"),
    Student(1002, "李四", 19, "2021-06-29"), Student(1003, "王五", 20, "2021-06-29")))
    data.writeTo("hadoop_prod.db.test1").partitionedBy(col("dt")) //指定dt 为分区列
.create()
  }
}

2)验证,进入 spark sql 窗口,查看表结构和表数据

spark-sql (default)> desc test1;
spark-sql (default)> select *from test1;
spark-sql (default)> desc test1;

spark-sql (default)> select *from test1;

spark-sql (default)> select *from test1;

3)查看 hdfs,是否按 dt 进行分区

4.5.1写数据

4.5.1.1  Append

1)编写代码,执行

def AppendTable(sparkSession: SparkSession) = {
  //两种方式
  import sparkSession.implicits._
  val data = sparkSession.createDataset(Array(Student(1003, "王五", 11, "2021-06- 29"), Student(1004, "赵六", 10, "2021-06-30")))
  data.writeTo("hadoop_prod.db.test1").append()// 使 用 DataFrameWriterV2 API 
  data.write.format("iceberg").mode("append").save("hadoop_prod.db.test1")  // 使 用
  DataFrameWriterV1 API
 }
}

2)执行完毕后进行测试,注意:小 bug,执行完代码后,如果 spark sql 黑窗口不重新打开是不会刷新数据的,只有把 spark sql 窗口界面重新打开才会刷新数据。如果使用代码查询能看到最新数据

3)关闭,再次进入查询,可以查询到数据

4.5.1.2 OverWrite

1)编写代码,测试

/**
*动态覆盖
*@param sparkSession
*/
def OverWriteTable(sparkSession: SparkSession)={ 
   import sparkSession.implicits._
   val data = sparkSession.createDataset(Array(Student(1003, " 王五", 11, "2021-06-29"),
   Student(1004, "赵六", 10, "2021-06-30")))
   data.writeTo("hadoop_prod.db.test1").overwritePartitions() //动态覆盖,只会刷新所属分区数据
}

2)查询

3)显示,手动指定覆盖分区

def OverWriteTable2(sparkSession: SparkSession) = { 
   import sparkSession.implicits._
   val data = sparkSession.createDataset(Array(Student(1, "s1", 1, "111"), Student(2, "s2", 2, "111")))
   data.writeTo("hadoop_prod.db.test1").overwrite($"dt" === "2021-06-30")
}

4)查询,2021-06-30 分区的数据已经被覆盖走

4.6模拟数仓

4.6.1表模型

(1)表模型,底下 6 张基础表,合成一张宽表,再基于宽表统计指标

4.6.2建表语句

(1)建表语句

create table hadoop_prod.db.dwd_member(
   uid int,
   ad_id int,
   birthday string,
   email string,
   fullname string,
   iconurl string,
   lastlogin string,
   mailaddr string,
   memberlevel string,
   password string,
   paymoney string,
   phone string,
   qq string,
   register string,
   regupdatetime string,
   unitname string,
   userip string,
   zipcode string,
   dt string)
  using iceberg
   partitioned by(dt);
create table hadoop_prod.db.dwd_member_regtype(
  uid int,
  appkey string,
  appregurl string,
  bdp_uuid string,
  createtime timestamp,
  isranreg string,
  regsource string,
  regsourcename string,
  websiteid int,
  dt string)
  using iceberg
  partitioned by(dt);
create table hadoop_prod.db.dwd_base_ad(
adid int,
adname string,
dn string)
using iceberg
partitioned by (dn) ;
 create table hadoop_prod.db.dwd_base_website(
  siteid int,
  sitename string,
  siteurl string,
 `delete` int,
  createtime timestamp,
  creator string,
  dn string)
using iceberg
partitioned by (dn) ;
create table hadoop_prod.db.dwd_pcentermempaymoney(
  uid int,
  paymoney string,
  siteid int,
  vip_id int,
  dt string,
  dn string)
using iceberg   
 partitioned by(dt,dn);
 create table hadoop_prod.db.dwd_vip_level(
   vip_id int,
   vip_level string,
   start_time timestamp,
   end_time timestamp,
   last_modify_time timestamp,
   max_free string,
   min_free string,
   next_level string,
   operator string,
   dn string)
 using iceberg
 partitioned by(dn);
 create table hadoop_prod.db.dws_member(
  uid int,
  ad_id int,
  fullname string,
  iconurl string,
  lastlogin string,
  mailaddr string,
  memberlevel string,
  password string,
  paymoney string,
  phone string,
  qq string,
  register string,
  regupdatetime string,
  unitname string,
  userip string,
  zipcode string,
  appkey string,
  appregurl string,
  bdp_uuid string,
  reg_createtime timestamp,
  isranreg string,
  regsource string,
  regsourcename string,
  adname string,
  siteid int,
  sitename string,
  siteurl string,
  site_delete string,
  site_createtime string,
  site_creator string,
  vip_id int,
  vip_level string,
  vip_start_time timestamp,
  vip_end_time timestamp,
  vip_last_modify_time timestamp,
  vip_max_free string,
  vip_min_free string,
  vip_next_level string,
  vip_operator string,
 dt string,
dn string)
using iceberg
partitioned by(dt,dn);
 create  table hadoop_prod.db.ads_register_appregurlnum(
  appregurl string,
  num int,
  dt string,
  dn string)
using iceberg
partitioned by(dt);
 create table hadoop_prod.db.ads_register_top3memberpay(
  uid int,
  memberlevel string,
  register string,
  appregurl string,
  regsourcename string,
  adname string,
  sitename string,
  vip_level string,
  paymoney decimal(10,4),
  rownum int,
 dt string,
dn string)
using iceberg
partitioned by(dt);

4.6.3测试数据

(1)测试数据上传到 hadoop,作为第一层 ods

[root@hadoop103 software]# hadoop dfs -mkdir /ods
[root@hadoop103 software]# hadoop dfs -put *.log /ods

4.6.4 编写代码

4.6.4.1 dwd

1)创建目录,划分层级

2)编写所需实体类

package com.atguigu.iceberg.warehouse.bean 
import java.sql.Timestamp
case class BaseWebsite(
  siteid:  Int, sitename: String, siteurl: String, delete: Int, createtime: 
  Timestamp, creator: String,
  dn: String
)
case class MemberRegType(
  uid: Int, appkey: String,
  appregurl: String,
  bdp_uuid: String, createtime: Timestamp,
  isranreg: String, regsource: String, regsourcename: String, websiteid: Int,
  dt: String
)
case class VipLevel(
  vip_id: Int, vip_level: String, start_time: Timestamp, end_time: Timestamp,
  last_modify_time: Timestamp, max_free: String,
  min_free: String, next_level: String, operator: String, dn: String
)
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
0
0
0
25
分享
相关文章
Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
Flink CDC 于 2023 年 12 月 7 日重磅推出了其全新的 3.0 版本 ~
107698 8
 Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
Hadoop入门(一篇就够了)
Hadoop入门(一篇就够了)
21207 3
Hadoop入门(一篇就够了)
Maven 公共代理库
Maven 公共仓库提供了对 maven central、jcenter、google、spring 等常用的 Maven 仓库的镜像功能。用户可以通过页面浏览仓库内容、检索和下载制品。在构建时使用 Maven 个公共仓库地址下载制品速度更快,更稳定。
55360 1
Maven 公共代理库
SparkSQL DatasourceV2 之 Multiple Catalog
SparkSQL DatasourceV2作为Spark2.3引入的特性,在Spark 3.0 preview(2019/12/23)版本中又有了新的改进以更好的支持各类数据源。本文将从catalog角度,介绍新的数据源如何和Spark DatasourceV2进行集成。
SparkSQL DatasourceV2 之 Multiple Catalog
实时计算 Flink版产品使用问题之怎么实现从Oracle数据库读取多个表并将数据写入到Iceberg表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
【BetterBench】2024年都有哪些数学建模竞赛和大数据竞赛?
本文提供了2024年全年的数学建模和大数据竞赛时间表,列出了32个重要竞赛的报名时间、比赛时间、费用及报名地址等详细信息。
673 6
【BetterBench】2024年都有哪些数学建模竞赛和大数据竞赛?
Flink CDC:基于 Apache Flink 的流式数据集成框架
本文整理自阿里云 Flink SQL 团队研发工程师于喜千(yux)在 SECon 全球软件工程技术大会中数据集成专场沙龙的分享。
19482 11
Flink CDC:基于 Apache Flink 的流式数据集成框架
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问