iceberg实践

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: iceberg实践


iceberg常见错误


 

  • 讲讲踩的坑
  • 1 PartitionExpressionForMetastore 类找不到
  • 2 metastore 连接不上
  • 3 5.1.5-jhyde 的包下载不了
  • 4 还是包的问题(org/apache/avro/Conversion)

  • 解决方案:还是依赖问题
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>iceberg-learning</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <!-- project compiler -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <!-- maven compiler-->
        <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
        <!-- sdk -->
        <java.version>1.8</java.version>
        <scala.version>2.12.12</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <!-- engine-->
        <hadoop.version>2.7.2</hadoop.version>
        <flink.version>1.12.7</flink.version>
        <flink.cdc.version>2.0.2</flink.cdc.version>
        <iceberg.version>0.12.1</iceberg.version>
        <hive.version>2.3.6</hive.version>
        <!-- <scope.type>provided</scope.type>-->
        <scope.type>compile</scope.type>
    </properties>
    <dependencies>
    <!-- scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- flink Dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- <= 1.13 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- 1.14 -->
    <!-- <dependency>-->
    <!-- <groupId>org.apache.flink</groupId>-->
    <!-- <artifactId>flink-table-planner_${scala.binary.version}
    </artifactId>-->
    <!-- <version>${flink.version}</version>-->
    <!-- <scope>${scope.type}</scope>-->
    <!-- </dependency>-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-orc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-connector-kafka_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-hive_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-sql-connector-mysql-cdc</artifactId>
        <version>${flink.cdc.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- iceberg Dependency -->
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-flink-runtime</artifactId>
        <version>${iceberg.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- hadoop Dependency-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- hive Dependency-->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${hive.version}</version>
        <scope>${scope.type}</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-llap-tez</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.antlr</groupId>
        <artifactId>antlr-runtime</artifactId>
        <version>3.5.2</version>
    </dependency>
    </dependencies>
    <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.0.2</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>org.example.GenerateLog</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
                <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
                <plugin>
                    <artifactId>maven-site-plugin</artifactId>
                    <version>3.7.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-project-info-reports-plugin</artifactId>
                    <version>3.0.0</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>


快照删除


1.基于hive的catalog,对表进行小文件合并代码

package org.example
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.flink.actions.Actions
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import java.util
import java.util.concurrent.TimeUnit
object FlinkDataStreamSmallFileCompactTest {
  private var logger: org.slf4j.Logger = _
  def main(args: Array[String]): Unit = {
    logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
    Logger.getLogger("org.apache").setLevel(Level.INFO)
    Logger.getLogger("hive.metastore").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    // hadoop catalog
    val tablePath = "hdfs:///user/hive/warehouse/iceberg_db/iceberg_table"
    // hive catalog
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    System.setProperty("HADOOP_USER_NAME", "root")
    val map = new util.HashMap[String,String]()
    map.put("type", "iceberg")
    map.put("catalog-type", "hive")
    map.put("property-version", "2")
    map.put("/warehouse", "/user/hive/warehouse")
//    map.put("datanucleus.schema.autoCreateTables", "true")
//    压缩小文件
//    快照过期处理
    map.put("uri", "thrift://hadoop101:9083")
    val iceberg_catalog = CatalogLoader.hive(
      "hive_catalog6",//catalog名称
      new Configuration(),
      new util.HashMap()
    )
    val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称
      "behavior_log_ib6")//表名称
    val loader = TableLoader.fromCatalog(iceberg_catalog, identifier)
    loader.open()
    val table = loader.loadTable()
    Actions.forTable(env, table)
      .rewriteDataFiles
      .maxParallelism(5)
      .targetSizeInBytes(128 * 1024 * 1024)
      .execute
    // 清除历史快照
    val snapshot = table.currentSnapshot
    // val old = snapshot.timestampMillis - TimeUnit.MINUTES.toMillis(5)
    if (snapshot != null) {
      table.expireSnapshots
        .expireOlderThan(snapshot.timestampMillis)
        .commit()
    }
  }
}

2.清理最后一个快照的5分钟前的所有快照代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.flink.actions.Actions
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import java.util
import java.util.concurrent.TimeUnit
object FlinkDataStreamSmallFileCompactTest {
  private var logger: org.slf4j.Logger = _
  def main(args: Array[String]): Unit = {
    logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
    Logger.getLogger("org.apache").setLevel(Level.INFO)
    Logger.getLogger("hive.metastore").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    // hive catalog
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    System.setProperty("HADOOP_USER_NAME", "root")
    val map = new util.HashMap[String, String]()
    map.put("type", "iceberg")
    map.put("catalog-type", "hive")
    map.put("property-version", "2")
    map.put("/warehouse", "/user/hive/warehouse")
    //    map.put("datanucleus.schema.autoCreateTables", "true")
    //    压缩小文件
    //    快照过期处理
    map.put("uri", "thrift://hadoop101:9083")
    val iceberg_catalog = CatalogLoader.hive(
      "hive_catalog6", //catalog名称
      new Configuration(),
      new util.HashMap()
    )
//    val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称
//      "behavior_with_date_log_ib") //表名称  behavior_with_date_log_ib   behavior_log_ib6
    val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称
      "behavior_log_ib6") //表名称  behavior_with_date_log_ib   behavior_log_ib6
    val loader = TableLoader.fromCatalog(iceberg_catalog, identifier)
    loader.open()
    val table = loader.loadTable()
    Actions.forTable(env, table)
      .rewriteDataFiles
      .maxParallelism(5)
      .targetSizeInBytes(128 * 1024 * 1024)
      .execute
    // 清除5分钟前历史快照
    val snapshot = table.currentSnapshot
     val old = snapshot.timestampMillis - TimeUnit.MINUTES.toMillis(5)
    if (snapshot != null) {
      table.expireSnapshots
        .expireOlderThan(old)
        .commit()
      println(s" behavior_with_date_log_ib 表 清理完成!!!")
    }
  }
}

3.元数据随时间增加而元数据膨胀的问题

元数据随时间增加而元数据膨胀的问题:iceberg合并,删除快照,但元数据还是没有删除,6M的数据大小,有几十G的元数据。

解决方案

通过配置以下参数,可以控制metadata.json的个数

 'write.metadata.delete-after-commit.enabled'='true',
 'write.metadata.previous-versions-max'='5'

snap,m0.avro文件还是会不断增加, 控制他们的方法,可以使用合并小文件,并清理snapshot实现。


清理快照

CALL hive_iceberg_catalog.system.expire_snapshots(‘ods_base.xx_table_name’, TIMESTAMP ‘2022-08-03 00:00:00.000’, 10);


重写数据文件,但不回删原来文件,删除还需要清理快照

CALL hive_iceberg_catalog.system.rewrite_data_files(‘ods_base.xx_table_name’);


//false 就是不用spark的cache,避免OOM

CALL hive_iceberg_catalog.system.rewrite_manifests(‘ods_base.xx_table_name’, false)

CALL hive_iceberg_catalog.system.rewrite_manifests(‘ods_base.xx_table_name’)


清理孤儿文件,孤儿文件指没有被元数据引用到的文件

CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, dry_run => true)


tablelocation 改为具体的表地址,会清理不用的文件,

CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, location => ‘tablelocation/data’)


CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, location => ‘hdfs://ns/user/hive/warehouse/hive_iceberg_catalog/ods_base.db/xx_table_name/data’)




spark 与iceberg 版本


安装spark3.2.0-bin-hadoop3.2.tgz 对应iceberg0.13.0 是目前社区最稳定的版本。(试过spark3.2.1不行)

sparksql命令说明:--packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.0 会自动下载iceberg的包(只在第一次下载)

 


flink 与iceberg版本搭配


flink集成kafka, hive, iceberg的包

flink-sql-connector-hive-2.3.6_2.12-1.13.5.jar  
flink-sql-connector-kafka_2.12-1.13.5.jar  
iceberg-flink-runtime-1.13-0.13.1.jar  
iceberg-mr-0.13.1.jar
flink-sql-connector-mysql-cdc-2.1.1.jar

iceberg insert、delete、update使用事项:

  1. 设置metadata保留次数
  2. ‘format-version’=‘2’,配置了才支持delete和update
CREATE CATALOG hive_catalog6 WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://hadoop101:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs:///user/hive/warehouse/hive_catalog6'
);
use catalog hive_catalog6;
CREATE DATABASE xxzh_stock_mysql_db;
USE xxzh_stock_mysql_db;
CREATE TABLE stock_basic_iceberg_sink(
  `i`  INT NOT NULL,
  `ts_code`    CHAR(10) NOT NULL,
  `symbol`   CHAR(10) NOT NULL,
  `name` char(10) NOT NULL,
  `area`   CHAR(20) NOT NULL,
  `industry`   CHAR(20) NOT NULL,
  `list_date`   CHAR(10) NOT NULL,
  `actural_controller`   CHAR(100) ,
   PRIMARY KEY(i) NOT ENFORCED
) with(
 'write.metadata.delete-after-commit.enabled'='true',
 'write.metadata.previous-versions-max'='5',
 'format-version'='2'
)
-- 给表增加属性的方法
create table tablename(
   field1 field_type
) with (
   'key' = 'value'
)



mysql---->iceberg时区问题


 
结论:源表没有timezone, 下游表需要设置local timezone,这样就没问题了!

下游表:mysql的datetime和timestamp, 由原来对应的TIMESTAMP,改为TIMESTAMP_LTZ
LTZ: LOCAL TIME ZONE的意思

mysql源表:

      String createSql = "CREATE TABLE stock_basic_source(\n" +
                "  `i`  INT NOT NULL,\n" +
                "  `ts_code`     CHAR(10) NOT NULL,\n" +
                "  `symbol`   CHAR(10) NOT NULL,\n" +
                "  `name` char(10) NOT NULL,\n" +
                "  `area`   CHAR(20) NOT NULL,\n" +
                "  `industry`   CHAR(20) NOT NULL,\n" +
                "  `list_date`   CHAR(10) NOT NULL,\n" +
                "  `actural_controller`   CHAR(100),\n" +
                "  `update_time`   TIMESTAMP\n," +
                "  `update_timestamp`   TIMESTAMP\n," +
                "    PRIMARY KEY(i) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'mysql-cdc',\n" +
                "  'hostname' = 'hadoop103',\n" +
                "  'port' = '3306',\n" +
                "  'username' = 'XX',\n" +
                "  'password' = 'XX" +
                "  'database-name' = 'xxzh_stock',\n" +
                "  'table-name' = 'stock_basic2'\n" +
                ")" ;

下游表:

     String createSQl = "CREATE TABLE if not exists stock_basic2_iceberg_sink(\n" +
                "  `i`  INT NOT NULL,\n" +
                "  `ts_code`    CHAR(10) NOT NULL,\n" +
                "  `symbol`   CHAR(10) NOT NULL,\n" +
                "  `name` char(10) NOT NULL,\n" +
                "  `area`   CHAR(20) NOT NULL,\n" +
                "  `industry`   CHAR(20) NOT NULL,\n" +
                "  `list_date`   CHAR(10) NOT NULL,\n" +
                "  `actural_controller`   CHAR(100) ,\n" +
                "  `update_time`   TIMESTAMP_LTZ\n," +
                "  `update_timestamp`   TIMESTAMP_LTZ\n," +
                "   PRIMARY KEY(i) NOT ENFORCED\n" +
                ") with(\n" +
                " 'write.metadata.delete-after-commit.enabled'='true',\n" +
                " 'write.metadata.previous-versions-max'='5',\n" +
                " 'format-version'='2'\n" +
                ")";


iceberg upsert


1.iceberg对从kafka流入的数据,默认是追加写的
2.通过 给iceberg表设置
‘write.upsert.enabled’ = 'true' 参数,可以实现upsert模式

 

 

Iceberg删除过期snapshots、老的metadata files、孤立的文件,合并data files和manifests


相关文章
|
1月前
|
存储 关系型数据库 Apache
Halodoc使用Apache Hudi构建Lakehouse的关键经验
Halodoc使用Apache Hudi构建Lakehouse的关键经验
50 4
|
1月前
|
SQL API 数据处理
实时计算 Flink版产品使用合集之是否可以使用 Iceberg 将数据写入 HDFS
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 MySQL
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
213 0
|
9月前
|
存储 SQL 运维
幸福里基于 Flink & Paimon 的流式数仓实践
字节跳动基础架构工程师李国君,在 Streaming Lakehouse Meetup 的分享。
919 0
幸福里基于 Flink & Paimon 的流式数仓实践
|
9月前
|
消息中间件 存储 分布式计算
SmartNews 基于 Flink 的 Iceberg 实时数据湖实践
SmartNews 数据平台架构师 Apache Iceberg Contributor 戢清雨,在 Flink Forward Asia 2022 实时湖仓专场的分享。
1349 0
SmartNews 基于 Flink 的 Iceberg 实时数据湖实践
|
消息中间件 SQL 存储
《Apache Flink 案例集(2022版)》——1.数据集成——37手游-基于 Flink CDC + Hudi 湖仓一体方案实践
《Apache Flink 案例集(2022版)》——1.数据集成——37手游-基于 Flink CDC + Hudi 湖仓一体方案实践
360 0
|
存储 SQL 分布式计算
5分钟入门数据湖IceBerg
随着大数据存储和处理需求的多样化,如何构建一个统一的数据湖存储,并在其上进行多种形式的数据分析成了企业构建大数据生态的一个重要方向。Netflix 发起的 Apache Iceberg 项目具备 ACID 能力的表格式成为了大数据、数据湖领域炙手可热的方向。关注公众号:857Hub
741 0
5分钟入门数据湖IceBerg
|
存储 消息中间件 SQL
实时数据湖 Flink Hudi 实践探索
本文整理自阿里云技术专家陈玉兆在7月17日阿里云数据湖技术专场交流会的分享。
实时数据湖 Flink Hudi 实践探索
|
数据采集 消息中间件 canal
Flink CDC + Hudi 海量数据入湖在顺丰的实践
覃立辉在 5.21 Flink CDC Meetup 的分享。
Flink CDC + Hudi 海量数据入湖在顺丰的实践
|
消息中间件 分布式计算 关系型数据库
flink 集成iceberg 实践
flink 集成iceberg 实践