第 4 章 DataFrame 操作
4.1.配置 Resources
1)将自己 hadoop 集群的客户端配置文件复制到 resource 下,方便 local 模式调试
4.2配置 pom.xml
1)配置相关依赖
<properties> <spark.version>3.0.1</spark.version> <scala.version>2.12.10</scala.version> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.22</slf4j.version> <iceberg.version>0.11.1</iceberg.version> </properties> <dependencies> <!-- Spark 的依赖引入 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> </dependency> <!-- 引入Scala --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark3-runtime</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark3-extensions</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.1</version> <executions> <execution> <id>compile-scala</id> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build>
4.3读取表
package com.atguigu.iceberg.spark.sql import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} object TableOperations { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.hadoop_prod.type", "hadoop") .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse") .set("spark.sql.catalog.catalog-name.type", "hadoop") .set("spark.sql.catalog.catalog-name.default-namespace", "default") .set("spark.sql.sources.partitionOverwriteMode", "dynamic") .set("spark.sql.session.timeZone", "GMT+8") .setMaster("local[*]").setAppName("table_operations") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() readTale(sparkSession) } /** *读取iceberg 的表 *@param sparkSession */ def readTale(sparkSession: SparkSession) = { //三种方式sparkSession.table("hadoop_prod.db.testA").show() sparkSession.read.format("iceberg").load("hadoop_prod.db.testA").show() sparkSession.read.format("iceberg").load("/hive/warehouse/db/testA").show()// 路径到表就行,不要到具体文件 } }
4.4读取快照
def readSnapShots(sparkSession: SparkSession) = { //根据查询 hadoop_prod.db.testA.snapshots 快照表可以知道快照时间和快照id //根据时间戳读取,必须是时间戳 不能使用格式化后的时间 sparkSession.read .option("as-of-timestamp", "1624961454000") //毫秒时间戳,查询比该值时间更早的快照 .format("iceberg") .load("hadoop_prod.db.testA").show() //根据快照 id 查询 sparkSession.read .option("snapshot-id", "9054909815461789342") .format("iceberg") .load("hadoop_prod.db.testA").show() } }
4.5写入表
4.5.1写入数据并创建表
1)编写代码执行
case class Student(id: Int, name: String, age: Int, dt: String) def writeAndCreateTable(sparkSession: SparkSession) = { import sparkSession.implicits._ import org.apache.spark.sql.functions._ val data = sparkSession.createDataset[Student](Array(Student(1001, " 张 三 ", 18, "2021-06-28"), Student(1002, "李四", 19, "2021-06-29"), Student(1003, "王五", 20, "2021-06-29"))) data.writeTo("hadoop_prod.db.test1").partitionedBy(col("dt")) //指定dt 为分区列 .create() } }
2)验证,进入 spark sql 窗口,查看表结构和表数据
spark-sql (default)> desc test1; spark-sql (default)> select *from test1;
spark-sql (default)> desc test1;
spark-sql (default)> select *from test1;
spark-sql (default)> select *from test1;
3)查看 hdfs,是否按 dt 进行分区
4.5.1写数据
4.5.1.1 Append
1)编写代码,执行
def AppendTable(sparkSession: SparkSession) = { //两种方式 import sparkSession.implicits._ val data = sparkSession.createDataset(Array(Student(1003, "王五", 11, "2021-06- 29"), Student(1004, "赵六", 10, "2021-06-30"))) data.writeTo("hadoop_prod.db.test1").append()// 使 用 DataFrameWriterV2 API data.write.format("iceberg").mode("append").save("hadoop_prod.db.test1") // 使 用 DataFrameWriterV1 API } }
2)执行完毕后进行测试,注意:小 bug,执行完代码后,如果 spark sql 黑窗口不重新打开是不会刷新数据的,只有把 spark sql 窗口界面重新打开才会刷新数据。如果使用代码查询能看到最新数据
3)关闭,再次进入查询,可以查询到数据
4.5.1.2 OverWrite
1)编写代码,测试
/** *动态覆盖 *@param sparkSession */ def OverWriteTable(sparkSession: SparkSession)={ import sparkSession.implicits._ val data = sparkSession.createDataset(Array(Student(1003, " 王五", 11, "2021-06-29"), Student(1004, "赵六", 10, "2021-06-30"))) data.writeTo("hadoop_prod.db.test1").overwritePartitions() //动态覆盖,只会刷新所属分区数据 }
2)查询
3)显示,手动指定覆盖分区
def OverWriteTable2(sparkSession: SparkSession) = { import sparkSession.implicits._ val data = sparkSession.createDataset(Array(Student(1, "s1", 1, "111"), Student(2, "s2", 2, "111"))) data.writeTo("hadoop_prod.db.test1").overwrite($"dt" === "2021-06-30") }
4)查询,2021-06-30 分区的数据已经被覆盖走
4.6模拟数仓
4.6.1表模型
(1)表模型,底下 6 张基础表,合成一张宽表,再基于宽表统计指标
4.6.2建表语句
(1)建表语句
create table hadoop_prod.db.dwd_member( uid int, ad_id int, birthday string, email string, fullname string, iconurl string, lastlogin string, mailaddr string, memberlevel string, password string, paymoney string, phone string, qq string, register string, regupdatetime string, unitname string, userip string, zipcode string, dt string) using iceberg partitioned by(dt); create table hadoop_prod.db.dwd_member_regtype( uid int, appkey string, appregurl string, bdp_uuid string, createtime timestamp, isranreg string, regsource string, regsourcename string, websiteid int, dt string) using iceberg partitioned by(dt); create table hadoop_prod.db.dwd_base_ad( adid int, adname string, dn string) using iceberg partitioned by (dn) ; create table hadoop_prod.db.dwd_base_website( siteid int, sitename string, siteurl string, `delete` int, createtime timestamp, creator string, dn string) using iceberg partitioned by (dn) ; create table hadoop_prod.db.dwd_pcentermempaymoney( uid int, paymoney string, siteid int, vip_id int, dt string, dn string) using iceberg partitioned by(dt,dn); create table hadoop_prod.db.dwd_vip_level( vip_id int, vip_level string, start_time timestamp, end_time timestamp, last_modify_time timestamp, max_free string, min_free string, next_level string, operator string, dn string) using iceberg partitioned by(dn); create table hadoop_prod.db.dws_member( uid int, ad_id int, fullname string, iconurl string, lastlogin string, mailaddr string, memberlevel string, password string, paymoney string, phone string, qq string, register string, regupdatetime string, unitname string, userip string, zipcode string, appkey string, appregurl string, bdp_uuid string, reg_createtime timestamp, isranreg string, regsource string, regsourcename string, adname string, siteid int, sitename string, siteurl string, site_delete string, site_createtime string, site_creator string, vip_id int, vip_level string, vip_start_time timestamp, vip_end_time timestamp, vip_last_modify_time timestamp, vip_max_free string, vip_min_free string, vip_next_level string, vip_operator string, dt string, dn string) using iceberg partitioned by(dt,dn); create table hadoop_prod.db.ads_register_appregurlnum( appregurl string, num int, dt string, dn string) using iceberg partitioned by(dt); create table hadoop_prod.db.ads_register_top3memberpay( uid int, memberlevel string, register string, appregurl string, regsourcename string, adname string, sitename string, vip_level string, paymoney decimal(10,4), rownum int, dt string, dn string) using iceberg partitioned by(dt);
4.6.3测试数据
(1)测试数据上传到 hadoop,作为第一层 ods
[root@hadoop103 software]# hadoop dfs -mkdir /ods [root@hadoop103 software]# hadoop dfs -put *.log /ods
4.6.4 编写代码
4.6.4.1 dwd 层
1)创建目录,划分层级
2)编写所需实体类
package com.atguigu.iceberg.warehouse.bean import java.sql.Timestamp case class BaseWebsite( siteid: Int, sitename: String, siteurl: String, delete: Int, createtime: Timestamp, creator: String, dn: String ) case class MemberRegType( uid: Int, appkey: String, appregurl: String, bdp_uuid: String, createtime: Timestamp, isranreg: String, regsource: String, regsourcename: String, websiteid: Int, dt: String ) case class VipLevel( vip_id: Int, vip_level: String, start_time: Timestamp, end_time: Timestamp, last_modify_time: Timestamp, max_free: String, min_free: String, next_level: String, operator: String, dn: String )