第 1 章 介绍
Apache Iceberg 是一种用于大型分析数据集的开放表格,Iceberge 向 Trino 和 Spark 添加了使用高性能格式的表,就像 Sql 表一样。
Iceberg 为了避免出现不变要的一些意外,表结构和组织并不会实际删除,用户也不需要特意了解分区便可进行快速查询。
- Iceberg 的表支持快速添加、删除、更新或重命名操作
- 将分区列进行隐藏,避免用户错误的使用分区和进行极慢的查询。
- 分区列也会随着表数据量或查询模式的变化而自动更新。
- 表可以根据时间进行表快照,方便用户根据时间进行检查更改。
- 提供版本回滚,方便用户纠错数据。
Iceberg 是为大表而建的,Iceberg 用于生产中,其中单表数据量可包含 10pb 左右数据, 甚至可以在没有分布式 SQL 引擎的情况下读取这些巨量数据。
- 查询计划非常迅速,不需要分布式 SQL 引擎来读取数据
- 高级过滤:可以使用分区和列来过滤查询这些数据
- 可适用于任何云存储
- 表的任何操作都是原子性的,用户不会看到部分或未提交的内容。
- 使用多个并发器进行写入,并使用乐观锁重试的机制来解决兼容性问题
本文demo基于 0.11.1 版本较老,iceberg官网已经没有该版本样例了,同时改版本也不支持一些iceberg的新特性,比如:upsert功能,动态schema变更以及索引和小文件合并等问题。但是不影响对主要API和功能的学习和理解
第 2 章 构建 Iceberg
构建 Iceberge 需要 Grade 5.41 和 java8 或 java11 的环境
2.1 构建 Iceberg
1.上传 iceberg-apache-iceberg-0.11.1.zip,并进行解压
[root@hadoop103 software]# unzip iceberg-apache-iceberg-0.11.1.zip -d /opt/module/ [root@hadoop103 software]# cd /opt/module/iceberg-apache-iceberg-0.11.1/
2.修改对应版本
[root@hadoop103 iceberg-apache-iceberg-0.11.1]# vim versions.props org.apache.flink:* = 1.11.0 org.apache.hadoop:* = 3.1.3 org.apache.hive:hive-metastore = 2.3.7 org.apache.hive:hive-serde = 2.3.7 org.apache.spark:spark-hive_2.12 = 3.0.1 org.apache.hive:hive-exec = 2.3.7 org.apache.hive:hive-service = 2.3.7
3.修改国内镜像
[root@hadoop103 iceberg-apache-iceberg-0.11.1]# vim build.gradle buildscript { repositories { jcenter() gradlePluginPortal() maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/' } maven{ url 'http://maven.aliyun.com/nexus/content/repositories/jcenter'} maven { url "http://palantir.bintray.com/releases" } maven { url "https://plugins.gradle.org/m2/" } } allprojects { group = "org.apache.iceberg" version = getProjectVersion() repositories { maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/'} maven{ url 'http://maven.aliyun.com/nexus/content/repositories/jcenter'} maven { url "http://palantir.bintray.com/releases" } mavenCentral() mavenLocal() } }
4.构建项目
[root@hadoop103 iceberg-apache-iceberg-0.11.1]# ./gradlew build -x test
第 3 章 Spark 操作
3.1.配置参数和 jar 包
1.将构建好的 Iceberg 的 spark 模块 jar 包,复制到 spark jars 下
[root@hadoop103]/opt/module/iceberg-apache-iceberg-0.11.1/spark3-extensions/build/libs/ [root@hadoop103 libs]# cp *.jar /opt/module/spark-3.0.1-bin-hadoop2.7/jars/ [root@hadoop103 libs]# cd /opt/module/iceberg-apache-iceberg-0.11.1/spark3-runtime/build/libs/ [root@hadoop103 libs]# cp *.jar /opt/module/spark-3.0.1-bin-hadoop2.7/jars/
2.配置 spark 参数,配置 Spark Sql Catlog,可以用两种方式,基于 hive 和基于 hadoop,这里先选择基于 hadoop。
[root@hadoop103 libs]# cd /opt/module/spark-3.0.1-bin-hadoop2.7/conf/ [root@hadoop103 conf]# vim spark-defaults.conf spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hive_prod.type = hive spark.sql.catalog.hive_prod.uri = thrift://hadoop101:9083 spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hadoop_prod.type = hadoop spark.sql.catalog.hadoop_prod.warehouse = hdfs://mycluster/spark/warehouse spark.sql.catalog.catalog-name.type = hadoop spark.sql.catalog.catalog-name.default-namespace = db spark.sql.catalog.catalog-name.uri = thrift://hadoop101:9083 spark.sql.catalog.catalog-name.warehouse= hdfs://mycluster/spark/warehouse
3.2 Spark sql 操作
- 正在上传…重新上传取消使用 spark sql 创建 iceberg 表,配置完毕后,会多出一个 hadoop_prod.db 数据库,但是注意这个数据库通过 show tables 是看不到的
[root@hadoop103 ~]# spark-sql spark-sql (default)> use hadoop_prod.db; create table testA( id bigint, name string, age int, dt string) USING iceberg PARTITIONED by(dt);
2.插入数据
spark-sql (default)> insert into testA values(1,'张三',18,'2021-06-21');
3.查询
spark-sql (default)> select *from testA;
3.2.1over write 操作
(1)覆盖操作与 hive 一样,会将原始数据重新刷新
spark-sql (default)> insert overwrite testA values(2,'李四',20,'2021-06-21'); spark-sql (default)> select *from testA;
3.2.2动态覆盖
1.Spark 的默认覆盖模式是静态的,但在写入 iceberg 时建议使用动态覆盖模式。静态覆盖模式需要制定分区列,动态覆盖模式不需要。
spark-sql (default)> insert overwrite testA values(2,'李四',20,'2021-06-21'); spark-sql (default)> select *from testA;
2.设置动态覆盖模式,修改 spark-default.conf,添加对应参数
[root@hadoop103 conf]# vim spark-defaults.conf spark.sql.sources.partitionOverwriteMode=dynamic
3.创建一张表结构与 testA 完全一致的表 testB
create table hadoop_prod.db.testB( id bigint, name string, age int, dt string) USING iceberg PARTITIONED by(dt);
4.向 testA 表中再插入一条数据
spark-sql (default)> use hadoop_prod.db; spark-sql (default)> insert into testA values(1,'张三',18,'2021-06-22');
5.查询 testA 表,此时 testA 表中有两条记录
spark-sql (default)> select *from testA;
6.通过动态覆盖模式将 A 表插入到 B 表中
spark-sql (default)> insert overwrite testB select *from testA;
7.查询 testB 表,可以看到效果与 hive 中的动态分区一样,自动根据列的顺序进行匹配插入,无须手动指定分区。
spark-sql (default)> select *from testB;
3.2.3静态覆盖
1.静态覆盖,则跟 hive 插入时手动指定分区一致,需要手动指定分区列的值
spark-sql (default)> insert overwrite testB Partition(dt='2021-06-26') select id,name,age from testA;
2.查询表数据
spark-sql (default)> select *from testB;
3.2.4删除数据
1.iceberg 并不会物理删除数据,下面演示 delete 操作,根据分区列进行删除 testB 表数据
spark-sql (default)> delete from testB where dt >='2021-06-21' and dt <='2021-06-26';
2.提示删除成功,再次查询数据。发现表中已无数据,但是存在 hdfs 上的物理并没有实际删除
3.查看 hdfs 数据,仍然存在。
3.2.5历史快照
1.每张表都拥有一张历史表,历史表的表名为当前表加上后缀.history,注意:查询历史表的时候必须是表的全称,不可用先切到 hadoop.db 库中再查 testB
spark-sql (default)> select *from hadoop_prod.db.testB.history;
2.可以查看到每次操作后的对应的快照记录,也可以查询对应快照表,快照表的表名在 原表基础上加上.snapshots,也是一样必须是表的全称不能简写
spark-sql (default)> select *from hadoop_prod.db.testB.snapshots;
3.可以在看到 commit 的时间,snapshot 快照的 id,parent_id 父节点,operation 操作类型, 已经 summary 概要,summary 概要字段中可以看到数据量大小,总条数,路径等信息。
两张表也可以根据 snapshot_id 快照 id 进行 join 关联查询。
spark-sql (default)> select *from hadoop_prod.db.testB.history a join hadoop_prod.db.testB.snapshots b on a.snapshot_id=b.snapshot_id ;
4.知道了快照表和快照信息后,可以根据快照 id 来查询具体的历史信息,发进行检测是否误操作,如果是误操作则可通过 spark 重新刷新数据。查询方式如下
scala> spark.read.option("snapshot-id","5549650043576786799").format("iceberg").load("/hive/w arehouse/db/testB").show
3.2.6隐藏分区(有 bug 时区不对)
1.days 函数
1)上面演示了创建分区表, 接下来演示创建隐藏分区表。隐藏分区支持的函数有 years,months,days,hours,bucket,truncate。比如接下来创建一张 testC 表,表中有id,name 和 ts时间戳。
create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (days(ts));
2)创建成功分别往里面插入不同天时间戳的数据
spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1624773600 as timestamp)),(2,'李四',cast(1624860000 as timestamp));
3)插入成功之后再来查询表数据。
spark-sql (default)> select *from hadoop_prod.db.testC;
4)可以看到有两条数据,并且日期也不是同一天,查看 hdfs 上对应的分区。已经自动按天进行了分区。
2.years 函数
1)删除 testC 表,重新建表,字段还是不变,分区字段使用 years 函数
spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (years(ts));
2)同样,插入两条不同年时间戳的数据,进行查询对比
spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1624860000 as timestamp)),(2,'李四',cast(1593324000 as timestamp));
3)查询数据
spark-sql (default)> select *from hadoop_prod.db.testC;
4)再查看 hdfs 对应的地址,已经按年建好分区
3.month 函数
1)删除 testC 表,重新建表,字段不变, 使用 month 函数进行分区
spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (months(ts));
2)同样,插入不同月份时间戳的两条记录
spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1624860000 as timestamp)),(2,'李四',cast(1622181600 as timestamp));
3)查询数据和 hdfs 对应地址
spark-sql (default)> select *from hadoop_prod.db.testC;
4.hours 函数
1)删除 testC 表,重新建表,字段不变使用 hours 函数
spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (hours(ts));
2)插入两条不同小时的时间戳数据
spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1622181600 as timestamp)),(2,'李四',cast(1622178000 as timestamp));
3)查询数据和 hdfs 地址
spark-sql (default)> select *from hadoop_prod.db.testC;
4)发现时区不对,修改对应参数
root@hadoop103 ~]# vim /opt/module/spark-3.0.1-bin-hadoop2.7/conf/spark-defaults.conf spark.sql.session.timeZone=GMT+8
5)再次启动 spark sql 插入数据
[root@hadoop103 ~]# spark-sql spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1622181600 as timestamp)),(2,'李四',cast(1622178000 as timestamp));
6)查看 hdfs 路径,还是错误分区目录(bug)
5.bucket 函数(有 bug)
1)删除 testC 表,重新创建,表字段不变,使用 bucket 函数。分桶 hash 算法采用 Murmur3 hash,官网介绍 https://iceberg.apache.org/spec/#partition-transforms
spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (bucket(16,id));
2)插入一批测试数据,为什么分多批插入,有 bug:如果一批数据中有数据被分到同一个桶里会报错
insert into hadoop_prod.db.testC values (1,'张 1',cast(1622152800 as timestamp)),(1,'李 1',cast(1622178000 as timestamp)), (2,'张 2',cast(1622152800 as timestamp)),(3,'李 2',cast(1622178000 as timestamp)), (4,'张 3',cast(1622152800 as timestamp)),(6,'李 3',cast(1622178000 as timestamp)), (5,'张 4',cast(1622152800 as timestamp)),(8,'李 4',cast(1622178000 as timestamp)); insert into hadoop_prod.db.testC values (9,'张 5',cast(1622152800 as timestamp)),(10,'李 5',cast(1622178000 as timestamp)), (11,'张 6',cast(1622152800 as timestamp)),(12,'李 6',cast(1622178000 as timestamp)); insert into hadoop_prod.db.testC values (13,'张 7',cast(1622152800 as timestamp)),(14,'李 7',cast(1622178000 as timestamp)), (15,'张 8',cast(1622152800 as timestamp)),(16,'李 8',cast(1622178000 as timestamp)); insert into hadoop_prod.db.testC values (17,'张 9',cast(1622152800 as timestamp)),(18,'李 9',cast(1622178000 as timestamp)), (18,'张 10',cast(1622152800 as timestamp)),(20,'李 10',cast(1622178000 as timestamp)); insert into hadoop_prod.db.testC values (1001,'张 9',cast(1622152800 as timestamp)),(1003,'李 9',cast(1622178000 as timestamp)), (1002,'张 10',cast(1622152800 as timestamp)),(1004,'李 10',cast(1622178000 as timestamp));
3)查看表数据和 hdfs 路径
spark-sql (default)> select *from hadoop_prod.db.testC;
spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (truncate(4,id));
6.truncate 函数
1)删除表,重新建表,字段不变,使用 truncate 函数,截取长度来进行分区
spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC( id bigint, name string, ts timestamp) using iceberg partitioned by (truncate(4,id));
2)插入一批测试数据
insert into hadoop_prod.db.testC values (10010001,' 张 1',cast(1622152800 as timestamp)),(10010002,' 李 1',cast(1622178000 as timestamp)), (10010003,' 张 2',cast(1622152800 as timestamp)),(10020001,' 李 2',cast(1622178000 as timestamp)), (10020002,' 张 3',cast(1622152800 as timestamp)),(10030001,' 李 3',cast(1622178000 as timestamp)), (10040001,' 张 4',cast(1622152800 as timestamp)),(10050001,' 李 4',cast(1622178000 as timestamp));
3)查询表数据和 hdfs 地址,分区目录为 id 数/4 得到的值(计算方式是 /不是%)。
spark-sql (default)> select *from hadoop_prod.db.testC;