iceberg常见错误
- 讲讲踩的坑
- 1 PartitionExpressionForMetastore 类找不到
- 2 metastore 连接不上
- 3 5.1.5-jhyde 的包下载不了
- 4 还是包的问题(org/apache/avro/Conversion)
- 解决方案:还是依赖问题
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>iceberg-learning</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!-- project compiler --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <!-- maven compiler--> <scala.maven.plugin.version>3.2.2</scala.maven.plugin.version> <maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version> <maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version> <!-- sdk --> <java.version>1.8</java.version> <scala.version>2.12.12</scala.version> <scala.binary.version>2.12</scala.binary.version> <!-- engine--> <hadoop.version>2.7.2</hadoop.version> <flink.version>1.12.7</flink.version> <flink.cdc.version>2.0.2</flink.cdc.version> <iceberg.version>0.12.1</iceberg.version> <hive.version>2.3.6</hive.version> <!-- <scope.type>provided</scope.type>--> <scope.type>compile</scope.type> </properties> <dependencies> <!-- scala --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> <scope>${scope.type}</scope> </dependency> <!-- flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version} </artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version} </artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <!-- <= 1.13 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version} </artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <!-- 1.14 --> <!-- <dependency>--> <!-- <groupId>org.apache.flink</groupId>--> <!-- <artifactId>flink-table-planner_${scala.binary.version} </artifactId>--> <!-- <version>${flink.version}</version>--> <!-- <scope>${scope.type}</scope>--> <!-- </dependency>--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-orc_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version} </artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.binary.version} </artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.binary.version} </artifactId> <version>${flink.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>${flink.cdc.version}</version> <scope>${scope.type}</scope> </dependency> <!-- iceberg Dependency --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime</artifactId> <version>${iceberg.version}</version> <scope>${scope.type}</scope> </dependency> <!-- hadoop Dependency--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <scope>${scope.type}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>${scope.type}</scope> </dependency> <!-- hive Dependency--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>${scope.type}</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>org.apache.hive</groupId> <artifactId>hive-llap-tez</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.antlr</groupId> <artifactId>antlr-runtime</artifactId> <version>3.5.2</version> </dependency> </dependencies> <build> <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --> <plugins> <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --> <plugin> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --> <plugin> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.1</version> </plugin> <plugin> <artifactId>maven-jar-plugin</artifactId> <version>3.0.2</version> <configuration> <archive> <manifest> <mainClass>org.example.GenerateLog</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --> <plugin> <artifactId>maven-site-plugin</artifactId> <version>3.7.1</version> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> <version>3.0.0</version> </plugin> </plugins> </pluginManagement> </build> </project>
快照删除
1.基于hive的catalog,对表进行小文件合并代码
package org.example import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.hadoop.conf.Configuration import org.apache.iceberg.catalog.{Namespace, TableIdentifier} import org.apache.iceberg.flink.{CatalogLoader, TableLoader} import org.apache.iceberg.flink.actions.Actions import org.apache.log4j.{Level, Logger} import org.slf4j.LoggerFactory import java.util import java.util.concurrent.TimeUnit object FlinkDataStreamSmallFileCompactTest { private var logger: org.slf4j.Logger = _ def main(args: Array[String]): Unit = { logger = LoggerFactory.getLogger(this.getClass.getSimpleName) Logger.getLogger("org.apache").setLevel(Level.INFO) Logger.getLogger("hive.metastore").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) // hadoop catalog val tablePath = "hdfs:///user/hive/warehouse/iceberg_db/iceberg_table" // hive catalog val env = StreamExecutionEnvironment.getExecutionEnvironment System.setProperty("HADOOP_USER_NAME", "root") val map = new util.HashMap[String,String]() map.put("type", "iceberg") map.put("catalog-type", "hive") map.put("property-version", "2") map.put("/warehouse", "/user/hive/warehouse") // map.put("datanucleus.schema.autoCreateTables", "true") // 压缩小文件 // 快照过期处理 map.put("uri", "thrift://hadoop101:9083") val iceberg_catalog = CatalogLoader.hive( "hive_catalog6",//catalog名称 new Configuration(), new util.HashMap() ) val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称 "behavior_log_ib6")//表名称 val loader = TableLoader.fromCatalog(iceberg_catalog, identifier) loader.open() val table = loader.loadTable() Actions.forTable(env, table) .rewriteDataFiles .maxParallelism(5) .targetSizeInBytes(128 * 1024 * 1024) .execute // 清除历史快照 val snapshot = table.currentSnapshot // val old = snapshot.timestampMillis - TimeUnit.MINUTES.toMillis(5) if (snapshot != null) { table.expireSnapshots .expireOlderThan(snapshot.timestampMillis) .commit() } } }
2.清理最后一个快照的5分钟前的所有快照代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.hadoop.conf.Configuration import org.apache.iceberg.catalog.{Namespace, TableIdentifier} import org.apache.iceberg.flink.actions.Actions import org.apache.iceberg.flink.{CatalogLoader, TableLoader} import org.apache.log4j.{Level, Logger} import org.slf4j.LoggerFactory import java.util import java.util.concurrent.TimeUnit object FlinkDataStreamSmallFileCompactTest { private var logger: org.slf4j.Logger = _ def main(args: Array[String]): Unit = { logger = LoggerFactory.getLogger(this.getClass.getSimpleName) Logger.getLogger("org.apache").setLevel(Level.INFO) Logger.getLogger("hive.metastore").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN) // hive catalog val env = StreamExecutionEnvironment.getExecutionEnvironment System.setProperty("HADOOP_USER_NAME", "root") val map = new util.HashMap[String, String]() map.put("type", "iceberg") map.put("catalog-type", "hive") map.put("property-version", "2") map.put("/warehouse", "/user/hive/warehouse") // map.put("datanucleus.schema.autoCreateTables", "true") // 压缩小文件 // 快照过期处理 map.put("uri", "thrift://hadoop101:9083") val iceberg_catalog = CatalogLoader.hive( "hive_catalog6", //catalog名称 new Configuration(), new util.HashMap() ) // val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称 // "behavior_with_date_log_ib") //表名称 behavior_with_date_log_ib behavior_log_ib6 val identifier = TableIdentifier.of(Namespace.of("iceberg_db6"), //db名称 "behavior_log_ib6") //表名称 behavior_with_date_log_ib behavior_log_ib6 val loader = TableLoader.fromCatalog(iceberg_catalog, identifier) loader.open() val table = loader.loadTable() Actions.forTable(env, table) .rewriteDataFiles .maxParallelism(5) .targetSizeInBytes(128 * 1024 * 1024) .execute // 清除5分钟前历史快照 val snapshot = table.currentSnapshot val old = snapshot.timestampMillis - TimeUnit.MINUTES.toMillis(5) if (snapshot != null) { table.expireSnapshots .expireOlderThan(old) .commit() println(s" behavior_with_date_log_ib 表 清理完成!!!") } } }
3.元数据随时间增加而元数据膨胀的问题
元数据随时间增加而元数据膨胀的问题:iceberg合并,删除快照,但元数据还是没有删除,6M的数据大小,有几十G的元数据。
解决方案
通过配置以下参数,可以控制metadata.json的个数
'write.metadata.delete-after-commit.enabled'='true', 'write.metadata.previous-versions-max'='5'
snap,m0.avro文件还是会不断增加, 控制他们的方法,可以使用合并小文件,并清理snapshot实现。
清理快照
CALL hive_iceberg_catalog.system.expire_snapshots(‘ods_base.xx_table_name’, TIMESTAMP ‘2022-08-03 00:00:00.000’, 10);
重写数据文件,但不回删原来文件,删除还需要清理快照
CALL hive_iceberg_catalog.system.rewrite_data_files(‘ods_base.xx_table_name’);
//false 就是不用spark的cache,避免OOM
CALL hive_iceberg_catalog.system.rewrite_manifests(‘ods_base.xx_table_name’, false)
CALL hive_iceberg_catalog.system.rewrite_manifests(‘ods_base.xx_table_name’)
清理孤儿文件,孤儿文件指没有被元数据引用到的文件
CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, dry_run => true)
tablelocation 改为具体的表地址,会清理不用的文件,
CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, location => ‘tablelocation/data’)
CALL hive_iceberg_catalog.system.remove_orphan_files(table => ‘ods_base.xx_table_name’, location => ‘hdfs://ns/user/hive/warehouse/hive_iceberg_catalog/ods_base.db/xx_table_name/data’)
spark 与iceberg 版本
安装spark3.2.0-bin-hadoop3.2.tgz 对应iceberg0.13.0 是目前社区最稳定的版本。(试过spark3.2.1不行)
sparksql命令说明:--packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.0 会自动下载iceberg的包(只在第一次下载)
flink 与iceberg版本搭配
flink集成kafka, hive, iceberg的包
flink-sql-connector-hive-2.3.6_2.12-1.13.5.jar flink-sql-connector-kafka_2.12-1.13.5.jar iceberg-flink-runtime-1.13-0.13.1.jar iceberg-mr-0.13.1.jar flink-sql-connector-mysql-cdc-2.1.1.jar
iceberg insert、delete、update使用事项:
- 设置metadata保留次数
- ‘format-version’=‘2’,配置了才支持delete和update
CREATE CATALOG hive_catalog6 WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://hadoop101:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs:///user/hive/warehouse/hive_catalog6' ); use catalog hive_catalog6; CREATE DATABASE xxzh_stock_mysql_db; USE xxzh_stock_mysql_db; CREATE TABLE stock_basic_iceberg_sink( `i` INT NOT NULL, `ts_code` CHAR(10) NOT NULL, `symbol` CHAR(10) NOT NULL, `name` char(10) NOT NULL, `area` CHAR(20) NOT NULL, `industry` CHAR(20) NOT NULL, `list_date` CHAR(10) NOT NULL, `actural_controller` CHAR(100) , PRIMARY KEY(i) NOT ENFORCED ) with( 'write.metadata.delete-after-commit.enabled'='true', 'write.metadata.previous-versions-max'='5', 'format-version'='2' ) -- 给表增加属性的方法 create table tablename( field1 field_type ) with ( 'key' = 'value' )
mysql---->iceberg时区问题
结论:源表没有timezone, 下游表需要设置local timezone,这样就没问题了!
下游表:mysql的datetime和timestamp, 由原来对应的TIMESTAMP,改为TIMESTAMP_LTZ
LTZ: LOCAL TIME ZONE的意思
mysql源表:
String createSql = "CREATE TABLE stock_basic_source(\n" + " `i` INT NOT NULL,\n" + " `ts_code` CHAR(10) NOT NULL,\n" + " `symbol` CHAR(10) NOT NULL,\n" + " `name` char(10) NOT NULL,\n" + " `area` CHAR(20) NOT NULL,\n" + " `industry` CHAR(20) NOT NULL,\n" + " `list_date` CHAR(10) NOT NULL,\n" + " `actural_controller` CHAR(100),\n" + " `update_time` TIMESTAMP\n," + " `update_timestamp` TIMESTAMP\n," + " PRIMARY KEY(i) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = 'hadoop103',\n" + " 'port' = '3306',\n" + " 'username' = 'XX',\n" + " 'password' = 'XX" + " 'database-name' = 'xxzh_stock',\n" + " 'table-name' = 'stock_basic2'\n" + ")" ;
下游表:
String createSQl = "CREATE TABLE if not exists stock_basic2_iceberg_sink(\n" + " `i` INT NOT NULL,\n" + " `ts_code` CHAR(10) NOT NULL,\n" + " `symbol` CHAR(10) NOT NULL,\n" + " `name` char(10) NOT NULL,\n" + " `area` CHAR(20) NOT NULL,\n" + " `industry` CHAR(20) NOT NULL,\n" + " `list_date` CHAR(10) NOT NULL,\n" + " `actural_controller` CHAR(100) ,\n" + " `update_time` TIMESTAMP_LTZ\n," + " `update_timestamp` TIMESTAMP_LTZ\n," + " PRIMARY KEY(i) NOT ENFORCED\n" + ") with(\n" + " 'write.metadata.delete-after-commit.enabled'='true',\n" + " 'write.metadata.previous-versions-max'='5',\n" + " 'format-version'='2'\n" + ")";
iceberg upsert
1.iceberg对从kafka流入的数据,默认是追加写的
2.通过 给iceberg表设置 ‘write.upsert.enabled’ = 'true' 参数,可以实现upsert模式