iceberg实践

简介: iceberg实践


iceberg常见错误


 

  • 讲讲踩的坑
  • 1 PartitionExpressionForMetastore 类找不到
  • 2 metastore 连接不上
  • 3 5.1.5-jhyde 的包下载不了
  • 4 还是包的问题(org/apache/avro/Conversion)

  • 解决方案:还是依赖问题
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>iceberg-learning</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <!-- project compiler -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <!-- maven compiler-->
        <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
        <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
        <maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
        <!-- sdk -->
        <java.version>1.8</java.version>
        <scala.version>2.12.12</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <!-- engine-->
        <hadoop.version>2.7.2</hadoop.version>
        <flink.version>1.12.7</flink.version>
        <flink.cdc.version>2.0.2</flink.cdc.version>
        <iceberg.version>0.12.1</iceberg.version>
        <hive.version>2.3.6</hive.version>
        <!-- <scope.type>provided</scope.type>-->
        <scope.type>compile</scope.type>
    </properties>
    <dependencies>
    <!-- scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- flink Dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- <= 1.13 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- 1.14 -->
    <!-- <dependency>-->
    <!-- <groupId>org.apache.flink</groupId>-->
    <!-- <artifactId>flink-table-planner_${scala.binary.version}
    </artifactId>-->
    <!-- <version>${flink.version}</version>-->
    <!-- <scope>${scope.type}</scope>-->
    <!-- </dependency>-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-orc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-sql-connector-kafka_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-hive_${scala.binary.version}
        </artifactId>
        <version>${flink.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-sql-connector-mysql-cdc</artifactId>
        <version>${flink.cdc.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- iceberg Dependency -->
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-flink-runtime</artifactId>
        <version>${iceberg.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- hadoop Dependency-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>${hadoop.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
        <scope>${scope.type}</scope>
    </dependency>
    <!-- hive Dependency-->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>${hive.version}</version>
        <scope>${scope.type}</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-llap-tez</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.antlr</groupId>
        <artifactId>antlr-runtime</artifactId>
        <version>3.5.2</version>
    </dependency>
    </dependencies>
    <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.0.2</version>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>org.example.GenerateLog</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
                <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
                <plugin>
                    <artifactId>maven-site-plugin</artifactId>
                    <version>3.7.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-project-info-reports-plugin</artifactId>
                    <version>3.0.0</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>


快照删除


1.基于hive的catalog,对表进行小文件合并代码

package org.example
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.iceberg.flink.actions.Actions
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import java.util
import java.util.concurrent.TimeUnit
object FlinkDataStreamSmallFileCompactTest {
  private var logger: org.slf4j.Logger = _
  def main(args: Array[String]): Unit = {
    logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
    Logger.getLogger("org.apache").setLevel(Level.INFO)
    Logger.getLogger("hive.metastore").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    // hadoop catalog
    val tablePath = "hdfs:///user/hive/warehouse/iceberg_db/iceberg_table"
    // hive catalog
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    System.setProperty("HADOOP_USER_NAME", "root")
    val map = new util.HashMap[String,String]()
    map.put("type", "iceberg")
    map.put("catalog-type", "hive")
    map.put("property-version", "2")
    map.put("/warehouse", "/user/hive/warehouse")
//    map.put("datanucleus.schema.autoCreateTables", "true")
//    压缩小文件
//    快照过期处理
    map.put("uri", "thrift://hadoop101:9083")
    val iceberg_catalog = CatalogLoader.hive(
      "hive_catalog6",//catalog名称
      new Configuration(),
      new util.HashMap()
    )
    val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称
      "behavior_log_ib6")//表名称
    val loader = TableLoader.fromCatalog(iceberg_catalog, identifier)
    loader.open()
    val table = loader.loadTable()
    Actions.forTable(env, table)
      .rewriteDataFiles
      .maxParallelism(5)
      .targetSizeInBytes(128 * 1024 * 1024)
      .execute
    // 清除历史快照
    val snapshot = table.currentSnapshot
    // val old = snapshot.timestampMillis - TimeUnit.MINUTES.toMillis(5)
    if (snapshot != null) {
      table.expireSnapshots
        .expireOlderThan(snapshot.timestampMillis)
        .commit()
    }
  }
}

2.清理最后一个快照的5分钟前的所有快照代码

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.flink.actions.Actions
import org.apache.iceberg.flink.{CatalogLoader, TableLoader}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import java.util
import java.util.concurrent.TimeUnit
object FlinkDataStreamSmallFileCompactTest {
  private var logger: org.slf4j.Logger = _
  def main(args: Array[String]): Unit = {
    logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
    Logger.getLogger("org.apache").setLevel(Level.INFO)
    Logger.getLogger("hive.metastore").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)
    // hive catalog
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    System.setProperty("HADOOP_USER_NAME", "root")
    val map = new util.HashMap[String, String]()
    map.put("type", "iceberg")
    map.put("catalog-type", "hive")
    map.put("property-version", "2")
    map.put("/warehouse", "/user/hive/warehouse")
    //    map.put("datanucleus.schema.autoCreateTables", "true")
    //    压缩小文件
    //    快照过期处理
    map.put("uri", "thrift://hadoop101:9083")
    val iceberg_catalog = CatalogLoader.hive(
      "hive_catalog6", //catalog名称
      new Configuration(),
      new util.HashMap()
    )
//    val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称
//      "behavior_with_date_log_ib") //表名称  behavior_with_date_log_ib   behavior_log_ib6
    val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称
      "behavior_log_ib6") //表名称  behavior_with_date_log_ib   behavior_log_ib6
    val loader = TableLoader.fromCatalog(iceberg_catalog, identifier)
    loader.open()
    val table = loader.loadTable()
    Actions.forTable(env, table)
      .rewriteDataFiles
      .maxParallelism(5)
      .targetSizeInBytes(128 * 1024 * 1024)
      .execute
    // 清除5分钟前历史快照
    val snapshot = table.currentSnapshot
     val old = snapshot.timestampMillis - TimeUnit.MINUTES.toMillis(5)
    if (snapshot != null) {
      table.expireSnapshots
        .expireOlderThan(old)
        .commit()
      println(s" behavior_with_date_log_ib 表 清理完成!!!")
    }
  }
}

3.元数据随时间增加而元数据膨胀的问题

元数据随时间增加而元数据膨胀的问题:iceberg合并,删除快照,但元数据还是没有删除,6M的数据大小,有几十G的元数据。

解决方案

通过配置以下参数,可以控制metadata.json的个数

 'write.metadata.delete-after-commit.enabled'='true',
 'write.metadata.previous-versions-max'='5'

snap,m0.avro文件还是会不断增加, 控制他们的方法,可以使用合并小文件,并清理snapshot实现。


清理快照

CALL hive_iceberg_catalog.system.expire_snapshots(‘ods_base.xx_table_name’, TIMESTAMP ‘2022-08-03 00:00:00.000’, 10);


重写数据文件,但不回删原来文件,删除还需要清理快照

CALL hive_iceberg_catalog.system.rewrite_data_files(‘ods_base.xx_table_name’);


//false 就是不用spark的cache,避免OOM

CALL hive_iceberg_catalog.system.rewrite_manifests(‘ods_base.xx_table_name’, false)

CALL hive_iceberg_catalog.system.rewrite_manifests(‘ods_base.xx_table_name’)


清理孤儿文件,孤儿文件指没有被元数据引用到的文件

CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, dry_run => true)


tablelocation 改为具体的表地址,会清理不用的文件,

CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, location => ‘tablelocation/data’)


CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, location => ‘hdfs://ns/user/hive/warehouse/hive_iceberg_catalog/ods_base.db/xx_table_name/data’)




spark 与iceberg 版本


安装spark3.2.0-bin-hadoop3.2.tgz 对应iceberg0.13.0 是目前社区最稳定的版本。(试过spark3.2.1不行)

sparksql命令说明:--packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.0 会自动下载iceberg的包(只在第一次下载)

 


flink 与iceberg版本搭配


flink集成kafka, hive, iceberg的包

flink-sql-connector-hive-2.3.6_2.12-1.13.5.jar  
flink-sql-connector-kafka_2.12-1.13.5.jar  
iceberg-flink-runtime-1.13-0.13.1.jar  
iceberg-mr-0.13.1.jar
flink-sql-connector-mysql-cdc-2.1.1.jar

iceberg insert、delete、update使用事项:

  1. 设置metadata保留次数
  2. ‘format-version’=‘2’,配置了才支持delete和update
CREATE CATALOG hive_catalog6 WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://hadoop101:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs:///user/hive/warehouse/hive_catalog6'
);
use catalog hive_catalog6;
CREATE DATABASE xxzh_stock_mysql_db;
USE xxzh_stock_mysql_db;
CREATE TABLE stock_basic_iceberg_sink(
  `i`  INT NOT NULL,
  `ts_code`    CHAR(10) NOT NULL,
  `symbol`   CHAR(10) NOT NULL,
  `name` char(10) NOT NULL,
  `area`   CHAR(20) NOT NULL,
  `industry`   CHAR(20) NOT NULL,
  `list_date`   CHAR(10) NOT NULL,
  `actural_controller`   CHAR(100) ,
   PRIMARY KEY(i) NOT ENFORCED
) with(
 'write.metadata.delete-after-commit.enabled'='true',
 'write.metadata.previous-versions-max'='5',
 'format-version'='2'
)
-- 给表增加属性的方法
create table tablename(
   field1 field_type
) with (
   'key' = 'value'
)



mysql---->iceberg时区问题


 
结论:源表没有timezone, 下游表需要设置local timezone,这样就没问题了!

下游表:mysql的datetime和timestamp, 由原来对应的TIMESTAMP,改为TIMESTAMP_LTZ
LTZ: LOCAL TIME ZONE的意思

mysql源表:

      String createSql = "CREATE TABLE stock_basic_source(\n" +
                "  `i`  INT NOT NULL,\n" +
                "  `ts_code`     CHAR(10) NOT NULL,\n" +
                "  `symbol`   CHAR(10) NOT NULL,\n" +
                "  `name` char(10) NOT NULL,\n" +
                "  `area`   CHAR(20) NOT NULL,\n" +
                "  `industry`   CHAR(20) NOT NULL,\n" +
                "  `list_date`   CHAR(10) NOT NULL,\n" +
                "  `actural_controller`   CHAR(100),\n" +
                "  `update_time`   TIMESTAMP\n," +
                "  `update_timestamp`   TIMESTAMP\n," +
                "    PRIMARY KEY(i) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'mysql-cdc',\n" +
                "  'hostname' = 'hadoop103',\n" +
                "  'port' = '3306',\n" +
                "  'username' = 'XX',\n" +
                "  'password' = 'XX" +
                "  'database-name' = 'xxzh_stock',\n" +
                "  'table-name' = 'stock_basic2'\n" +
                ")" ;

下游表:

     String createSQl = "CREATE TABLE if not exists stock_basic2_iceberg_sink(\n" +
                "  `i`  INT NOT NULL,\n" +
                "  `ts_code`    CHAR(10) NOT NULL,\n" +
                "  `symbol`   CHAR(10) NOT NULL,\n" +
                "  `name` char(10) NOT NULL,\n" +
                "  `area`   CHAR(20) NOT NULL,\n" +
                "  `industry`   CHAR(20) NOT NULL,\n" +
                "  `list_date`   CHAR(10) NOT NULL,\n" +
                "  `actural_controller`   CHAR(100) ,\n" +
                "  `update_time`   TIMESTAMP_LTZ\n," +
                "  `update_timestamp`   TIMESTAMP_LTZ\n," +
                "   PRIMARY KEY(i) NOT ENFORCED\n" +
                ") with(\n" +
                " 'write.metadata.delete-after-commit.enabled'='true',\n" +
                " 'write.metadata.previous-versions-max'='5',\n" +
                " 'format-version'='2'\n" +
                ")";


iceberg upsert


1.iceberg对从kafka流入的数据,默认是追加写的
2.通过 给iceberg表设置
‘write.upsert.enabled’ = 'true' 参数,可以实现upsert模式

 

 

Iceberg删除过期snapshots、老的metadata files、孤立的文件,合并data files和manifests


相关文章
|
7月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1495 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
SQL 关系型数据库 MySQL
Flink mysql-cdc connector 源码解析
在 Flink 1.11 引入了 CDC 机制,CDC 的全称是 Change Data Capture,用于捕捉数据库表的增删改查操作,是目前非常成熟的同步数据库变更方案。Flink CDC Connectors 是 Apache Flink 的一组源连接器,是可以从 MySQL、PostgreSQL 数据直接读取全量数据和增量数据的 Source Connectors.
Flink mysql-cdc connector 源码解析
|
SQL 分布式计算 Hadoop
Iceberg实战踩坑指南(一)
Iceberg实战踩坑指南
2459 0
|
SQL 分布式计算 HIVE
|
SQL 分布式计算 HIVE
开源湖仓一体平台(二):Arctic(上篇)
开源湖仓一体平台(二):Arctic(上篇)
开源湖仓一体平台(二):Arctic(上篇)
|
SQL 存储 大数据
从0到1介绍一下开源大数据服务平台dataService
从0到1介绍一下开源大数据服务平台dataService
1192 1
|
SQL 分布式计算 HIVE
最强指南!数据湖Apache Hudi、Iceberg、Delta环境搭建
最强指南!数据湖Apache Hudi、Iceberg、Delta环境搭建
711 0
|
SQL 存储 分布式计算
Iceberg原理和项目使用技巧
Iceberg原理和项目使用技巧
1610 0
|
SQL JSON 关系型数据库
【万字长文】Flink cdc源码精讲(推荐收藏)(一)
【万字长文】Flink cdc源码精讲(推荐收藏)
2900 0
【万字长文】Flink cdc源码精讲(推荐收藏)(一)
|
SQL 消息中间件 监控
Iceberg实战踩坑指南(四)
Iceberg实战踩坑指南
653 0