Iceberg实战踩坑指南(一)

简介: Iceberg实战踩坑指南

 1  介绍

Apache Iceberg 是一种用于大型分析数据集的开放表格,Iceberge 向 Trino 和 Spark 添加了使用高性能格式的表,就像 Sql 表一样。

Iceberg 为了避免出现不变要的一些意外,表结构和组织并不会实际删除,用户也不需要特意了解分区便可进行快速查询。

  1. Iceberg 的表支持快速添加、删除、更新或重命名操作
  2. 将分区列进行隐藏,避免用户错误的使用分区和进行极慢的查询。
  3. 分区列也会随着表数据量或查询模式的变化而自动更新。
  4. 表可以根据时间进行表快照,方便用户根据时间进行检查更改。
  5. 提供版本回滚,方便用户纠错数据。

Iceberg 是为大表而建的,Iceberg 用于生产中,其中单表数据量可包含 10pb 左右数据, 甚至可以在没有分布式 SQL 引擎的情况下读取这些巨量数据。

  1. 查询计划非常迅速,不需要分布式 SQL 引擎来读取数据
  2. 高级过滤:可以使用分区和列来过滤查询这些数据
  3. 可适用于任何云存储
  4. 表的任何操作都是原子性的,用户不会看到部分或未提交的内容。
  5. 使用多个并发器进行写入,并使用乐观锁重试的机制来解决兼容性问题

本文demo基于 0.11.1 版本较老,iceberg官网已经没有该版本样例了,同时改版本也不支持一些iceberg的新特性,比如:upsert功能,动态schema变更以及索引和小文件合并等问题。但是不影响对主要API和功能的学习和理解

 2  构建 Iceberg

构建 Iceberge 需要 Grade 5.41 和 java8 或 java11 的环境

2.1 构建 Iceberg

  1.上传 iceberg-apache-iceberg-0.11.1.zip,并进行解压

[root@hadoop103 software]# unzip iceberg-apache-iceberg-0.11.1.zip -d /opt/module/ 
[root@hadoop103 software]# cd /opt/module/iceberg-apache-iceberg-0.11.1/

   2.修改对应版本

[root@hadoop103 iceberg-apache-iceberg-0.11.1]# vim versions.props org.apache.flink:* = 1.11.0
org.apache.hadoop:* = 3.1.3
org.apache.hive:hive-metastore = 2.3.7
org.apache.hive:hive-serde = 2.3.7
org.apache.spark:spark-hive_2.12 = 3.0.1
org.apache.hive:hive-exec = 2.3.7
org.apache.hive:hive-service = 2.3.7

    3.修改国内镜像

[root@hadoop103 iceberg-apache-iceberg-0.11.1]# vim build.gradle buildscript {
repositories { jcenter() gradlePluginPortal()
maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/' } maven{ url 'http://maven.aliyun.com/nexus/content/repositories/jcenter'} maven { url "http://palantir.bintray.com/releases" }
maven { url "https://plugins.gradle.org/m2/" }
}
allprojects {
group = "org.apache.iceberg" version = getProjectVersion() repositories {
maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/'} maven{ url 'http://maven.aliyun.com/nexus/content/repositories/jcenter'} maven { url "http://palantir.bintray.com/releases" }
mavenCentral() mavenLocal()
}
}

  4.构建项目

[root@hadoop103 iceberg-apache-iceberg-0.11.1]# ./gradlew build -x test

 3  Spark 操作

3.1.配置参数和 jar 

1.将构建好的 Iceberg 的 spark 模块 jar 包,复制到 spark jars 下

[root@hadoop103]/opt/module/iceberg-apache-iceberg-0.11.1/spark3-extensions/build/libs/ [root@hadoop103 libs]# cp *.jar /opt/module/spark-3.0.1-bin-hadoop2.7/jars/ [root@hadoop103 libs]#  cd
/opt/module/iceberg-apache-iceberg-0.11.1/spark3-runtime/build/libs/
[root@hadoop103 libs]# cp *.jar /opt/module/spark-3.0.1-bin-hadoop2.7/jars/

2.配置 spark 参数,配置 Spark Sql Catlog,可以用两种方式,基于 hive 和基于 hadoop,这里先选择基于 hadoop。

[root@hadoop103 libs]# cd /opt/module/spark-3.0.1-bin-hadoop2.7/conf/ 
[root@hadoop103 conf]# vim spark-defaults.conf spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hive_prod.type = hive spark.sql.catalog.hive_prod.uri = thrift://hadoop101:9083
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog 
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://mycluster/spark/warehouse
spark.sql.catalog.catalog-name.type = hadoop spark.sql.catalog.catalog-name.default-namespace = db spark.sql.catalog.catalog-name.uri = thrift://hadoop101:9083
spark.sql.catalog.catalog-name.warehouse= hdfs://mycluster/spark/warehouse

3.2 Spark sql 操作

  1. 正在上传…重新上传取消使用 spark sql 创建 iceberg 表,配置完毕后,会多出一个 hadoop_prod.db 数据库,但是注意这个数据库通过 show tables 是看不到的
[root@hadoop103 ~]# spark-sql
spark-sql (default)> use hadoop_prod.db; create table testA(
id bigint, name string, age int,
dt string) USING iceberg
PARTITIONED by(dt);

2.插入数据

spark-sql (default)> insert into testA values(1,'张三',18,'2021-06-21');

3.查询

spark-sql (default)> select *from testA;

3.2.1over write 操作

(1)覆盖操作与 hive 一样,会将原始数据重新刷新

spark-sql (default)> insert overwrite testA values(2,'李四',20,'2021-06-21'); 
spark-sql (default)> select *from testA;

3.2.2动态覆盖

1.Spark 的默认覆盖模式是静态的,但在写入 iceberg 时建议使用动态覆盖模式。静态覆盖模式需要制定分区列,动态覆盖模式不需要。

spark-sql (default)> insert overwrite testA values(2,'李四',20,'2021-06-21'); 
spark-sql (default)> select *from testA;

2.设置动态覆盖模式,修改 spark-default.conf,添加对应参数

[root@hadoop103 conf]# vim spark-defaults.conf 
spark.sql.sources.partitionOverwriteMode=dynamic

3.创建一张表结构与 testA 完全一致的表 testB

create table hadoop_prod.db.testB( id bigint,
name string, age int,
dt string) USING iceberg
PARTITIONED by(dt);

4.向 testA 表中再插入一条数据

spark-sql (default)> use hadoop_prod.db;
spark-sql (default)> insert into testA values(1,'张三',18,'2021-06-22');

5.查询 testA 表,此时 testA 表中有两条记录

spark-sql (default)> select *from testA;

6.通过动态覆盖模式将 A 表插入到 B 表中

spark-sql (default)> insert overwrite testB select *from testA;

7.查询 testB 表,可以看到效果与 hive 中的动态分区一样,自动根据列的顺序进行匹配插入,无须手动指定分区。

spark-sql (default)> select *from testB;

3.2.3静态覆盖

1.静态覆盖,则跟 hive 插入时手动指定分区一致,需要手动指定分区列的值

spark-sql (default)> insert overwrite testB Partition(dt='2021-06-26') 
select id,name,age from testA;

2.查询表数据

spark-sql (default)> select *from testB;

3.2.4删除数据

1.iceberg 并不会物理删除数据,下面演示 delete 操作,根据分区列进行删除 testB 表数据

spark-sql (default)> delete from testB where dt >='2021-06-21' and dt <='2021-06-26';

2.提示删除成功,再次查询数据。发现表中已无数据,但是存在 hdfs 上的物理并没有实际删除

3.查看 hdfs 数据,仍然存在。

3.2.5历史快照

1.每张表都拥有一张历史表,历史表的表名为当前表加上后缀.history,注意:查询历史表的时候必须是表的全称,不可用先切到 hadoop.db 库中再查 testB

spark-sql (default)> select *from hadoop_prod.db.testB.history;

2.可以查看到每次操作后的对应的快照记录,也可以查询对应快照表,快照表的表名在  原表基础上加上.snapshots,也是一样必须是表的全称不能简写

spark-sql (default)> select *from hadoop_prod.db.testB.snapshots;

3.可以在看到 commit 的时间,snapshot 快照的 id,parent_id 父节点,operation 操作类型, 已经 summary 概要,summary 概要字段中可以看到数据量大小,总条数,路径等信息。

两张表也可以根据 snapshot_id 快照 id 进行 join 关联查询。

spark-sql (default)>  select  *from hadoop_prod.db.testB.history  a join hadoop_prod.db.testB.snapshots b on a.snapshot_id=b.snapshot_id ;

4.知道了快照表和快照信息后,可以根据快照 id 来查询具体的历史信息,发进行检测是否误操作,如果是误操作则可通过 spark 重新刷新数据。查询方式如下

scala>
spark.read.option("snapshot-id","5549650043576786799").format("iceberg").load("/hive/w arehouse/db/testB").show

3.2.6隐藏分区(bug 时区不对)

1.days 函数

1)上面演示了创建分区表, 接下来演示创建隐藏分区表。隐藏分区支持的函数有 years,months,days,hours,bucket,truncate。比如接下来创建一张 testC 表,表中有id,name 和 ts时间戳。

create table hadoop_prod.db.testC(
id bigint, name string, ts timestamp) using iceberg
partitioned by (days(ts));

2)创建成功分别往里面插入不同天时间戳的数据

spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1624773600 as timestamp)),(2,'李四',cast(1624860000 as timestamp));

3)插入成功之后再来查询表数据。

spark-sql (default)> select *from hadoop_prod.db.testC;

4)可以看到有两条数据,并且日期也不是同一天,查看 hdfs 上对应的分区。已经自动按天进行了分区。

2.years 函数

1)删除 testC 表,重新建表,字段还是不变,分区字段使用 years 函数

spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC(
id bigint, name string, ts timestamp) using iceberg
partitioned by (years(ts));

2)同样,插入两条不同年时间戳的数据,进行查询对比

spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1624860000 as timestamp)),(2,'李四',cast(1593324000 as timestamp));

3)查询数据

spark-sql (default)> select *from hadoop_prod.db.testC;

4)再查看 hdfs 对应的地址,已经按年建好分区

3.month 函数

1)删除 testC 表,重新建表,字段不变, 使用 month 函数进行分区

spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC(
id bigint, name string, ts timestamp) using iceberg
partitioned by (months(ts));

2)同样,插入不同月份时间戳的两条记录

spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1624860000 as timestamp)),(2,'李四',cast(1622181600 as timestamp));

3)查询数据和 hdfs 对应地址

spark-sql (default)> select *from hadoop_prod.db.testC;

4.hours 函数

1)删除 testC 表,重新建表,字段不变使用 hours 函数

spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC(
id bigint, name string, ts timestamp) using iceberg
partitioned by (hours(ts));

2)插入两条不同小时的时间戳数据

spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1622181600 as timestamp)),(2,'李四',cast(1622178000 as timestamp));

3)查询数据和 hdfs 地址

spark-sql (default)> select *from hadoop_prod.db.testC;

4)发现时区不对,修改对应参数

root@hadoop103 ~]# vim /opt/module/spark-3.0.1-bin-hadoop2.7/conf/spark-defaults.conf spark.sql.session.timeZone=GMT+8

5)再次启动 spark sql 插入数据

[root@hadoop103 ~]# spark-sql
spark-sql (default)> insert into hadoop_prod.db.testC values(1,'张三',cast(1622181600 as timestamp)),(2,'李四',cast(1622178000 as timestamp));

6)查看 hdfs 路径,还是错误分区目录(bug)

5.bucket 函数(bug

1)删除 testC 表,重新创建,表字段不变,使用 bucket 函数。分桶 hash 算法采用 Murmur3 hash,官网介绍 https://iceberg.apache.org/spec/#partition-transforms

spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC(
id bigint, name string, ts timestamp) using iceberg
partitioned by (bucket(16,id));

2)插入一批测试数据,为什么分多批插入,有 bug:如果一批数据中有数据被分到同一个桶里会报错

insert into hadoop_prod.db.testC values
(1,'张 1',cast(1622152800 as timestamp)),(1,'李 1',cast(1622178000 as timestamp)), (2,'张 2',cast(1622152800 as timestamp)),(3,'李 2',cast(1622178000 as timestamp)), (4,'张 3',cast(1622152800 as timestamp)),(6,'李 3',cast(1622178000 as timestamp)), (5,'张 4',cast(1622152800 as timestamp)),(8,'李 4',cast(1622178000 as timestamp)); insert into hadoop_prod.db.testC values
(9,'张 5',cast(1622152800 as timestamp)),(10,'李 5',cast(1622178000 as timestamp)), (11,'张 6',cast(1622152800 as timestamp)),(12,'李 6',cast(1622178000 as timestamp)); insert into hadoop_prod.db.testC values
(13,'张 7',cast(1622152800 as timestamp)),(14,'李 7',cast(1622178000 as timestamp)),
(15,'张 8',cast(1622152800 as timestamp)),(16,'李 8',cast(1622178000 as timestamp)); insert into hadoop_prod.db.testC values
(17,'张 9',cast(1622152800 as timestamp)),(18,'李 9',cast(1622178000 as timestamp)),
(18,'张 10',cast(1622152800 as timestamp)),(20,'李 10',cast(1622178000 as timestamp)); insert into hadoop_prod.db.testC values
(1001,'张 9',cast(1622152800 as timestamp)),(1003,'李 9',cast(1622178000 as timestamp)),
(1002,'张 10',cast(1622152800 as timestamp)),(1004,'李 10',cast(1622178000 as timestamp));

3)查看表数据和 hdfs 路径

spark-sql (default)> select *from hadoop_prod.db.testC;

spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC(
id bigint, name string, ts timestamp) using iceberg
partitioned by (truncate(4,id));

6.truncate 函数

1)删除表,重新建表,字段不变,使用 truncate 函数,截取长度来进行分区

spark-sql (default)> drop table hadoop_prod.db.testC; create table hadoop_prod.db.testC(
id bigint, name string, ts timestamp) using iceberg
partitioned by (truncate(4,id));

2)插入一批测试数据

insert into hadoop_prod.db.testC values
(10010001,' 张 1',cast(1622152800 as timestamp)),(10010002,' 李 1',cast(1622178000 as timestamp)),
(10010003,' 张 2',cast(1622152800 as timestamp)),(10020001,' 李 2',cast(1622178000     
as timestamp)),
(10020002,' 张 3',cast(1622152800 as timestamp)),(10030001,' 李 3',cast(1622178000  
as timestamp)),
(10040001,' 张 4',cast(1622152800 as timestamp)),(10050001,' 李 4',cast(1622178000 as timestamp));

3)查询表数据和 hdfs 地址,分区目录为 id 数/4 得到的值(计算方式是 /不是%)。

spark-sql (default)> select *from hadoop_prod.db.testC;

 



相关文章
|
6月前
|
存储 消息中间件 分布式计算
flink的常见知识点总结(一)
flink的常见知识点总结(一)
|
Java Apache 开发工具
Flink 源码阅读环境搭建
阅读优秀的源码是提升我们代码技能最重要的手段之一,工欲善其事必先利其器,所以,搭建好源码阅读环境是我们阅读的第一步。
|
SQL 存储 分布式计算
|
资源调度 Kubernetes 网络协议
一文搞懂Flink架构与任务编排|青训营笔记
本文主要讲述Flink的整体架构,以及流处理任务涉及的各个算子的调度编排机制。为模仿实现一个简易流处理引擎作下铺垫。
554 0
一文搞懂Flink架构与任务编排|青训营笔记
|
存储 SQL 资源调度
Flink 内核原理与实现-入门
Flink 内核原理与实现-入门
298 0
Flink 内核原理与实现-入门
|
存储 分布式计算 算法
2022年Flink面试题整理
JobManager扮演着集群中的管理者Master的角色,它是整个集群的协调者,负责接收Flink Job,协调检查点,Failover 故障恢复等,同时管理Flink集群中从节点TaskManager。
2400 0
|
存储 SQL 分布式计算
Flink 引擎简介 | 青训营笔记
从产品技术来看,Flink 具备如下流计算技术特征:完全一次保证:故障后应正确恢复有状态运算符中的状态;低延迟:越低越好。许多应用程序需要亚秒级延迟;高吞吐量:随着数据速率的增长,通过管道推送大量数据至关重要;强大的计算模型:框架应该提供一种编程模型,该模型不限制用户并允许各种各样的应用程序在没有故障的情况下,容错机制的开销很低;流量控制:来自慢速算子的反压应该由系统和数据源自然吸收,以避免因消费者缓慢而导致崩溃或降低性能;乱序数据的支持:支持由于其他原因导致的数据乱序达到、延迟到达后,计算出正确的结果;完备的流式语义:支持窗口等现代流式处理语义抽象;
217 0
Flink 引擎简介 | 青训营笔记
下一篇
无影云桌面