Iceberg实战踩坑指南(四)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Iceberg实战踩坑指南

5 Structured Streaming 操作

5.1 基于 Structured Streaming 落明细数据

5.1.1创建测试 topic

[root@hadoop103 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper
hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 12 --topic test1

1)启动 kafka,创建测试用的 topic

2)导入依赖

编写 producer 往 topic 里发送测试数据

package com.atguigu.iceberg.spark.structuredstreaming;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties; import java.util.Random;
public class TestProducer {
   public static void main(String[] args) { 
      Properties props = new Properties();
      props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092"); 
      props.put("acks", "-1");
      props.put("batch.size", "1048576");
      props.put("linger.ms", "5"); props.put("compression.type", "snappy"); 
      props.put("buffer.memory", "33554432"); props.put("key.serializer",
   "org.apache.kafka.common.serialization.StringSerializer"); 
      props.put("value.serializer",
   "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, 
     String> producer = new KafkaProducer<String, String>(props); Random random = new 
      Random();
      for (int i = 0; i < 10000000; i++) { 
         producer.send(new ProducerRecord<String,String> 
         ("test1",i+"\t"+random.nextInt(100)+"\t"+random.nextInt(3)
          +"\t"+System.currentTimeMillis()));
      }
   producer.flush(); producer.close();
  }
}

3)创建测试表

create table hadoop_prod.db.test_topic( uid bigint,
courseid int, deviceid int, ts timestamp)
using iceberg partitioned by(days(ts));

5.3 编写代码

基于 test1 的测试数据,编写结构化流代码,进行测试

package com.atguigu.iceberg.spark.structuredstreaming
import java.sql.Timestamp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}
object TestTopicOperators {
   def main(args: Array[String]): Unit = { 
     val sparkConf = new SparkConf()
    .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
    .set("spark.sql.catalog.hadoop_prod.warehouse", 
    "hdfs://mycluster/hive/warehouse")
    .set("spark.sql.catalog.catalog-name.type", "hadoop")
    .set("spark.sql.catalog.catalog-name.default-namespace", "default")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.sql.session.timeZone", "GMT+8")
    .set("spark.sql.shuffle.partitions", "12")
   //.setMaster("local[*]")
   .setAppName("test_topic")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() val 
    df = sparkSession.readStream.format("kafka")
    .option("kafka.bootstrap.servers", 
    "hadoop101:9092,hadoop102:9092,hadoop103:9092")
    .option("subscribe", "test1")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", "10000").load()
import sparkSession.implicits._
   val query = df.selectExpr("cast (value as string)").as[String]
    .map(item => {
      val array = item.split("\t") val uid = array(0)
      val courseid = array(1) val deviceid = array(2) val ts = array(3)
      Test1(uid.toLong, courseid.toInt, deviceid.toInt, new Timestamp(ts.toLong))
     }).writeStream.foreachBatch { (batchDF: Dataset[Test1], batchid: Long) => 
          batchDF.writeTo("hadoop_prod.db.test_topic").overwritePartitions()
     }.option("checkpointLocation", "/ss/checkpoint")
      .start() query.awaitTermination()
    }
case class Test1(uid: BigInt,
courseid: Int, deviceid: Int, ts: Timestamp)

5.4 提交 yarn 测试速度

1)打成 jar 包,上传到集群,运行代码跑 yarn 模式 让 vcore 个数和 shuffle 分区数保持1:1 最高效运行

[root@hadoop103 lizunting]# spark-submit  --master  yarn  --deploy-mode client
--driver-memory 1g  --num-executors 3 --executor-cores 4  --executor-memory 2g --queue spark  --class com.atguigu.iceberg.spark.structuredstreaming.TestTopicOperators
iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

2)运行起来后,查看 Spark Web UI 界面监控速度。趋于稳定后,可以看到速度能到每秒10200,条左右,已经达到了我参数所设置的上限。当然分区数(kafka 分区和 shuffle 分区) 和 vcore 越多实时性也会越高目前测试是 12 分区。

3)实时性没问题,但是有一个缺点,没有像 hudi 一样解决小文件问题。解决过多文件数可以更改 trigger 触发时间,但也会影响实时效率,两者中和考虑使用。

4)最后是花了 18 分钟跑完 1000 万条数据,查询表数据观察是否有数据丢失。数据没有丢失。

6 章存在的问题和缺点

6.1问题

  1. 时区无法设置
  2. Spark Sql 黑窗口,缓存无法更新,修改表数据后,得需要关了黑窗口再重新打开,查询才是更新后的数据
  3. 表分区如果指定多个分区或分桶,那么插入批量数据时,如果这一批数据有多条数据在同一个分区会报错

6.2缺点

  1. 与 hudi 相比,没有解决小文件问题
  2. 与 hudi 相比,缺少行级更新,只能对表的数据按分区进行 overwrite 全量覆盖

7 Flink 操作

7.1配置参数和 jar 

/opt/module/iceberg-apache-iceberg-0.11.1/flink-runtime/build/libs/
[root@hadoop103 libs]# cp *.jar /opt/module/flink-1.11.0/lib/
root@hadoop103 flink-1.11.0]# vim bin/config.sh 
export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3 
export HADOOP_CLASSPATH=`hadoop classpath`
export PATH=$PATH:$HADOOP_CLASSPATH

1)Flink1.11 开始就不在提供 flink-shaded-hadoop-2-uber 的支持,所以如果需要 flink 支hadoop 得配置环境变量 HADOOP_CLASSPATH

2)目前 Iceberg 只支持 flink1.11.x 的版本,所以我这使用 flink1.11.0,将构建好的 Iceberg的 jar 包复制到 flink 下

7.2 Flink SQL Client

[root@hadoop103 ~]# cd /opt/module/flink-1.11.0/
[root@hadoop103 flink-1.11.0]# bin/start-cluster.sh

1)在 hadoop 环境下,启动一个单独的 flink 集群

2)启动 flin sql client

[root@hadoop103 flink-1.11.0]# bin/sql-client.sh embedded shell

7.3 使用 Catalogs 创建目录

1) flink 可以通过 sql client 来创建 catalogs 目录, 支持的方式有 hive catalog,hadoop catalog,custom catlog。我这里采用 hadoop catlog。

CREATE CATALOG hadoop_catalog WITH ( 
'type'='iceberg',
'catalog-type'='hadoop', 'warehouse'='hdfs://mycluster/flink/warehouse/', 'property-version'='1'
);

2)使用当前 catalog

3)创建 sql-client-defaults.yaml,方便以后启动 flink-sql 客户端,走 iceberg 目录

Flink SQL> exit;
[root@hadoop103 flink-1.11.0]# cd conf/ [root@hadoop103 conf]# vim sql-client-defaults.yaml catalogs:
- name: hadoop_catalog
type: iceberg catalog-type: hadoop
warehouse: hdfs://mycluster/flink/warehouse/

7.4 Flink SQL 操作

7.4.1建库

[root@hadoop103 flink-1.11.0]# bin/sql-client.sh embedded shell

1)再次启动 Flink SQL客户端

2)可以使用默认数据库,也可以创建数据库

Flink SQL> CREATE DATABASE iceberg_db;
Flink SQL> show databases;

3)使用 iceberg 数据库

7.4.2建表(flink 不支持隐藏分区)

建表,我这里直接创建分区表了,使用 flink 对接 iceberg 不能使用 iceberg 的隐藏分区这一特性,目前还不支持。

7.4.3 like建表

可以使用create table 表名 like 的 sql 语句创建表结构完全一样的表

7.4.4 insert into

Flink SQL> insert into iceberg.testA values(1001,' 张三',18,'2021-07-01'),(1001,' 李四
',19,'2021-07-02');

7.4.5查询

7.4.6任务监控

1) 可 查 看 hadoop103 默 认 端 口 8081 查 看 standlone 模 式 任 务 是 否 成 功

2)插入数据后,同样 hdfs 路径上也是有对应目录和数据块

7.4.7insert overwrite

1)使用 overwrite 插入

Flink SQL> insert overwrite iceberg.testA values(1,' 王 五 ',18,'2021-07-01'),(2,' 马 六
',19,'2021-07-02');

2)flink 默认使用流的方式插入数据,这个时候流的插入是不支持 overwrite 操作的

3)需要将插入模式进行修改,改成批的插入方式,再次使用 overwrite 插入数据。如需要改回流式操作参数设置为 SET execution.type = streaming ;

4)查询结果,已经将结果根据分区进行覆盖操作

8 Flink API 操作

8.1配置 pom.xml

(1)配置相关依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
  <artifactId>iceberg-demo</artifactId>
  <groupId>com.atguigu.iceberg</groupId>
  <version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>icberg-flink-demo</artifactId>
<properties>
   <flink.version>1.11.0</flink.version>
   <scala.version>2.12.10</scala.version>
   <scala.binary.version>2.12</scala.binary.version>
   <log4j.version>1.2.17</log4j.version>
   <slf4j.version>1.7.22</slf4j.version>
   <iceberg.version>0.11.1</iceberg.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
 </dependency>
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
 </dependency>
 <dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
   <version>${scala.version}</version>
 </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!--
https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--  https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
 <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-flink-runtime</artifactId>
    <version>0.11.1</version>
 </dependency>
 <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
 <dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>3.1.3</version>
 </dependency>
 <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
 </dependency>
</dependencies>
</project>

8.2.1读取表数据

8.2.1.1batch read

package com.atguigu.iceberg.flink.sql;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData;
import  org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.source.FlinkSource;
public class TableOperations {
   public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
           StreamExecutionEnvironment.getExecutionEnvironment();
        TableLoader tableLoader = 
     TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testA");
           batchRead(env, tableLoader); env.execute();
   }
   public static void batchRead(StreamExecutionEnvironment env, TableLoader 
       tableLoader){
     DataStream<RowData>  batch = 
     FlinkSource.forRowData().env(env).
     tableLoader(tableLoader).streaming(false).build();
     batch.map(item ->
     item.getInt(0)+"\t"+item.getString(1)+"\t"+item.getInt(2)
      +"\t"+item.getString(3)).prin t();
  }
}

8.2.1.2streamingread

public static void streamingRead(StreamExecutionEnvironment env, TableLoader tableLoader){
     DataStream<RowData>  stream  = 
     FlinkSource.forRowData().env(env).
      tableLoader(tableLoader).streaming(true).build();
      stream.print();
}

1)通过 streaming 的方式去读取数据

2)启动之后程序不会立马停止

3)因为是流处理,这个时候手动往表中追加一条数据

Flink SQL> insert into iceberg.testA values(3,'哈哈哈',18,'2021-07-01');

可以看到控制台,实时打印出了数据

8.3写数据

8.3.1Appending Data

public static void appendingData(StreamExecutionEnvironment env,TableLoader tableLoader){
    DataStream<RowData> batch =. FlinkSource.forRowData().env(env).
    tableLoader(tableLoader).streaming(false).build(); TableLoader  tableB  =
    TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB"); 
    FlinkSink.forRowData(batch).tableLoader(tableB).build();
}

使用上面 create table testB like testA 的 testB 表,读取 A 表数据插入到 B 表数据采用的是 batch 批处理,代码执行两次并查询查看 append 效果

8.3.3OverwriteData

1)编写代码,将 overwrite 设置为 true

public static void overtData(StreamExecutionEnvironment env,TableLoader tableLoader){
    DataStream<RowData> batch =
      FlinkSource.forRowData().env(env).
      tableLoader(tableLoader).streaming(false).build(); TableLoader  tableB  =
      TableLoader.fromHadoopTable(
"hdfs://mycluster/flink/warehouse/iceberg/testB"); 
      FlinkSink.forRowData(batch).
      tableLoader(tableB).overwrite(true).build();
}

2)查询 testB 表查看 overwrite 效果,根据分区将数据进行了覆盖操作

8.4模拟数仓

flink 操作iceberg 的示例代码

9 Flink 存在的问题

  1. Flink 不支持 Iceberg 隐藏分区
  2. 不支持通过计算列创建表
  3. 不支持创建带水位线的表
  4. 不支持添加列、删除列、重命名列
目录
相关文章
|
存储 分布式计算 监控
深入浅出 HBase 实战 | 青训营笔记
Hbase是一种NoSQL数据库,这意味着它不像传统的RDBMS数据库那样支持SQL作为查询语言。Hbase是一种分布式存储的数据库,技术上来讲,它更像是分布式存储而不是分布式数据库,它缺少很多RDBMS系统的特性,比如列类型,辅助索引,触发器,和高级查询语言等待。
1113 0
深入浅出 HBase 实战 | 青训营笔记
|
SQL 分布式计算 DataX
HIVE3 深度剖析 (下篇)
HIVE3 深度剖析 (下篇)
|
SQL 存储 分布式计算
HIVE3 深度剖析 (上篇)
HIVE3 深度剖析 (上篇)
|
分布式计算 Hadoop
Iceberg实战踩坑指南(二)
Iceberg实战踩坑指南
226 0
Iceberg实战踩坑指南(三)
Iceberg实战踩坑指南
158 0
|
SQL 分布式计算 Hadoop
Iceberg实战踩坑指南(一)
Iceberg实战踩坑指南
1368 0
|
SQL 存储 分布式计算
Iceberg原理和项目使用技巧
Iceberg原理和项目使用技巧
863 0
|
Java Apache 开发工具
Flink 源码阅读环境搭建
阅读优秀的源码是提升我们代码技能最重要的手段之一,工欲善其事必先利其器,所以,搭建好源码阅读环境是我们阅读的第一步。
|
SQL 存储 分布式计算
手把手教学hive on spark,还不会的小伙伴快上车了
Hive3.1.2源码编译+Spark3.0.0+Hadoop3.1.3
459 0
|
存储 分布式计算 负载均衡
深入浅出 HBase 实战|青训营笔记
1.介绍 HBase 的适用场景和数据模型;2.分析 HBase 的整体架构和模块设计;3.针对大数据场景 HBase 的解决方案
260 0
深入浅出 HBase 实战|青训营笔记