Iceberg实战踩坑指南(二)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Iceberg实战踩坑指南

 4  DataFrame 操作

4.1.配置 Resources

1)将自己 hadoop 集群的客户端配置文件复制到 resource 下,方便 local 模式调试

4.2配置 pom.xml

1)配置相关依赖

<properties>
    <spark.version>3.0.1</spark.version>
    <scala.version>2.12.10</scala.version>
    <log4j.version>1.2.17</log4j.version>
    <slf4j.version>1.7.22</slf4j.version>
    <iceberg.version>0.11.1</iceberg.version>
</properties>
<dependencies>
    <!-- Spark 的依赖引入 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- 引入Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-spark3-runtime</artifactId>
        <version>${iceberg.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-spark3-extensions</artifactId>
        <version>${iceberg.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.46</version>
    </dependency>
</dependencies>
<build>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <version>2.15.1</version>
        <executions>
            <execution>
                <id>compile-scala</id>
                <goals>
                    <goal>add-source</goal>
                    <goal>compile</goal>
                </goals>
            </execution>
            <execution>
                <id>test-compile-scala</id>
                <goals>
                    <goal>add-source</goal>
                    <goal>testCompile</goal>
               </goals>
           </execution>
        </executions>
    </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
            </manifest>
          </archive>
          <descriptorRefs>
             <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
     </plugin>
   </plugins>
</build>

4.3读取表

package com.atguigu.iceberg.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object TableOperations {
  def main(args: Array[String]): Unit = { val sparkConf = new SparkConf()
   .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
   .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
   .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse")
   .set("spark.sql.catalog.catalog-name.type", "hadoop")
   .set("spark.sql.catalog.catalog-name.default-namespace", "default")
   .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
   .set("spark.sql.session.timeZone", "GMT+8")
   .setMaster("local[*]").setAppName("table_operations")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 
    readTale(sparkSession)
}
/**
*读取iceberg 的表
*@param sparkSession
*/
  def readTale(sparkSession: SparkSession) = {
     //三种方式sparkSession.table("hadoop_prod.db.testA").show()
     sparkSession.read.format("iceberg").load("hadoop_prod.db.testA").show() 
     sparkSession.read.format("iceberg").load("/hive/warehouse/db/testA").show()// 路径到表就行,不要到具体文件
  }
}

4.4读取快照

 def readSnapShots(sparkSession: SparkSession) = {
  //根据查询 hadoop_prod.db.testA.snapshots 快照表可以知道快照时间和快照id
  //根据时间戳读取,必须是时间戳 不能使用格式化后的时间
  sparkSession.read
  .option("as-of-timestamp", "1624961454000") //毫秒时间戳,查询比该值时间更早的快照
  .format("iceberg")
  .load("hadoop_prod.db.testA").show()
  //根据快照 id 查询
  sparkSession.read
     .option("snapshot-id", "9054909815461789342")
     .format("iceberg")
     .load("hadoop_prod.db.testA").show()
  }
}

4.5写入表

4.5.1写入数据并创建表

1)编写代码执行

case class Student(id: Int, name: String, age: Int, dt: String)
  def writeAndCreateTable(sparkSession: SparkSession) = { 
    import  sparkSession.implicits._
    import org.apache.spark.sql.functions._
    val data = sparkSession.createDataset[Student](Array(Student(1001, " 张 三 ", 18, "2021-06-28"),
    Student(1002, "李四", 19, "2021-06-29"), Student(1003, "王五", 20, "2021-06-29")))
    data.writeTo("hadoop_prod.db.test1").partitionedBy(col("dt")) //指定dt 为分区列
.create()
  }
}

2)验证,进入 spark sql 窗口,查看表结构和表数据

spark-sql (default)> desc test1;
spark-sql (default)> select *from test1;
spark-sql (default)> desc test1;

spark-sql (default)> select *from test1;

spark-sql (default)> select *from test1;

3)查看 hdfs,是否按 dt 进行分区

4.5.1写数据

4.5.1.1  Append

1)编写代码,执行

def AppendTable(sparkSession: SparkSession) = {
  //两种方式
  import sparkSession.implicits._
  val data = sparkSession.createDataset(Array(Student(1003, "王五", 11, "2021-06- 29"), Student(1004, "赵六", 10, "2021-06-30")))
  data.writeTo("hadoop_prod.db.test1").append()// 使 用 DataFrameWriterV2 API 
  data.write.format("iceberg").mode("append").save("hadoop_prod.db.test1")  // 使 用
  DataFrameWriterV1 API
 }
}

2)执行完毕后进行测试,注意:小 bug,执行完代码后,如果 spark sql 黑窗口不重新打开是不会刷新数据的,只有把 spark sql 窗口界面重新打开才会刷新数据。如果使用代码查询能看到最新数据

3)关闭,再次进入查询,可以查询到数据

4.5.1.2 OverWrite

1)编写代码,测试

/**
*动态覆盖
*@param sparkSession
*/
def OverWriteTable(sparkSession: SparkSession)={ 
   import sparkSession.implicits._
   val data = sparkSession.createDataset(Array(Student(1003, " 王五", 11, "2021-06-29"),
   Student(1004, "赵六", 10, "2021-06-30")))
   data.writeTo("hadoop_prod.db.test1").overwritePartitions() //动态覆盖,只会刷新所属分区数据
}

2)查询

3)显示,手动指定覆盖分区

def OverWriteTable2(sparkSession: SparkSession) = { 
   import sparkSession.implicits._
   val data = sparkSession.createDataset(Array(Student(1, "s1", 1, "111"), Student(2, "s2", 2, "111")))
   data.writeTo("hadoop_prod.db.test1").overwrite($"dt" === "2021-06-30")
}

4)查询,2021-06-30 分区的数据已经被覆盖走

4.6模拟数仓

4.6.1表模型

(1)表模型,底下 6 张基础表,合成一张宽表,再基于宽表统计指标

4.6.2建表语句

(1)建表语句

create table hadoop_prod.db.dwd_member(
   uid int,
   ad_id int,
   birthday string,
   email string,
   fullname string,
   iconurl string,
   lastlogin string,
   mailaddr string,
   memberlevel string,
   password string,
   paymoney string,
   phone string,
   qq string,
   register string,
   regupdatetime string,
   unitname string,
   userip string,
   zipcode string,
   dt string)
  using iceberg
   partitioned by(dt);
create table hadoop_prod.db.dwd_member_regtype(
  uid int,
  appkey string,
  appregurl string,
  bdp_uuid string,
  createtime timestamp,
  isranreg string,
  regsource string,
  regsourcename string,
  websiteid int,
  dt string)
  using iceberg
  partitioned by(dt);
create table hadoop_prod.db.dwd_base_ad(
adid int,
adname string,
dn string)
using iceberg
partitioned by (dn) ;
 create table hadoop_prod.db.dwd_base_website(
  siteid int,
  sitename string,
  siteurl string,
 `delete` int,
  createtime timestamp,
  creator string,
  dn string)
using iceberg
partitioned by (dn) ;
create table hadoop_prod.db.dwd_pcentermempaymoney(
  uid int,
  paymoney string,
  siteid int,
  vip_id int,
  dt string,
  dn string)
using iceberg   
 partitioned by(dt,dn);
 create table hadoop_prod.db.dwd_vip_level(
   vip_id int,
   vip_level string,
   start_time timestamp,
   end_time timestamp,
   last_modify_time timestamp,
   max_free string,
   min_free string,
   next_level string,
   operator string,
   dn string)
 using iceberg
 partitioned by(dn);
 create table hadoop_prod.db.dws_member(
  uid int,
  ad_id int,
  fullname string,
  iconurl string,
  lastlogin string,
  mailaddr string,
  memberlevel string,
  password string,
  paymoney string,
  phone string,
  qq string,
  register string,
  regupdatetime string,
  unitname string,
  userip string,
  zipcode string,
  appkey string,
  appregurl string,
  bdp_uuid string,
  reg_createtime timestamp,
  isranreg string,
  regsource string,
  regsourcename string,
  adname string,
  siteid int,
  sitename string,
  siteurl string,
  site_delete string,
  site_createtime string,
  site_creator string,
  vip_id int,
  vip_level string,
  vip_start_time timestamp,
  vip_end_time timestamp,
  vip_last_modify_time timestamp,
  vip_max_free string,
  vip_min_free string,
  vip_next_level string,
  vip_operator string,
 dt string,
dn string)
using iceberg
partitioned by(dt,dn);
 create  table hadoop_prod.db.ads_register_appregurlnum(
  appregurl string,
  num int,
  dt string,
  dn string)
using iceberg
partitioned by(dt);
 create table hadoop_prod.db.ads_register_top3memberpay(
  uid int,
  memberlevel string,
  register string,
  appregurl string,
  regsourcename string,
  adname string,
  sitename string,
  vip_level string,
  paymoney decimal(10,4),
  rownum int,
 dt string,
dn string)
using iceberg
partitioned by(dt);

4.6.3测试数据

(1)测试数据上传到 hadoop,作为第一层 ods

[root@hadoop103 software]# hadoop dfs -mkdir /ods
[root@hadoop103 software]# hadoop dfs -put *.log /ods

4.6.4 编写代码

4.6.4.1 dwd

1)创建目录,划分层级

2)编写所需实体类

package com.atguigu.iceberg.warehouse.bean 
import java.sql.Timestamp
case class BaseWebsite(
  siteid:  Int, sitename: String, siteurl: String, delete: Int, createtime: 
  Timestamp, creator: String,
  dn: String
)
case class MemberRegType(
  uid: Int, appkey: String,
  appregurl: String,
  bdp_uuid: String, createtime: Timestamp,
  isranreg: String, regsource: String, regsourcename: String, websiteid: Int,
  dt: String
)
case class VipLevel(
  vip_id: Int, vip_level: String, start_time: Timestamp, end_time: Timestamp,
  last_modify_time: Timestamp, max_free: String,
  min_free: String, next_level: String, operator: String, dn: String
)
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
存储 分布式计算 监控
深入浅出 HBase 实战 | 青训营笔记
Hbase是一种NoSQL数据库,这意味着它不像传统的RDBMS数据库那样支持SQL作为查询语言。Hbase是一种分布式存储的数据库,技术上来讲,它更像是分布式存储而不是分布式数据库,它缺少很多RDBMS系统的特性,比如列类型,辅助索引,触发器,和高级查询语言等待。
1113 0
深入浅出 HBase 实战 | 青训营笔记
|
SQL 分布式计算 DataX
HIVE3 深度剖析 (下篇)
HIVE3 深度剖析 (下篇)
|
SQL 存储 分布式计算
HIVE3 深度剖析 (上篇)
HIVE3 深度剖析 (上篇)
|
SQL 分布式计算 Hadoop
Iceberg实战踩坑指南(一)
Iceberg实战踩坑指南
1368 0
|
SQL 消息中间件 监控
Iceberg实战踩坑指南(四)
Iceberg实战踩坑指南
351 0
Iceberg实战踩坑指南(三)
Iceberg实战踩坑指南
158 0
|
SQL 存储 分布式计算
Iceberg原理和项目使用技巧
Iceberg原理和项目使用技巧
863 0
|
Java Apache 开发工具
Flink 源码阅读环境搭建
阅读优秀的源码是提升我们代码技能最重要的手段之一,工欲善其事必先利其器,所以,搭建好源码阅读环境是我们阅读的第一步。
|
SQL 存储 分布式计算
手把手教学hive on spark,还不会的小伙伴快上车了
Hive3.1.2源码编译+Spark3.0.0+Hadoop3.1.3
459 0
|
存储 分布式计算 负载均衡
深入浅出 HBase 实战|青训营笔记
1.介绍 HBase 的适用场景和数据模型;2.分析 HBase 的整体架构和模块设计;3.针对大数据场景 HBase 的解决方案
260 0
深入浅出 HBase 实战|青训营笔记