第 5 章 Structured Streaming 操作
5.1 基于 Structured Streaming 落明细数据
5.1.1创建测试 topic
[root@hadoop103 kafka_2.11-2.4.0]# bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka_2.4 --create --replication-factor 2 --partitions 12 --topic test1
1)启动 kafka,创建测试用的 topic
2)导入依赖
编写 producer 往 topic 里发送测试数据
package com.atguigu.iceberg.spark.structuredstreaming; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.Random; public class TestProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092"); props.put("acks", "-1"); props.put("batch.size", "1048576"); props.put("linger.ms", "5"); props.put("compression.type", "snappy"); props.put("buffer.memory", "33554432"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); Random random = new Random(); for (int i = 0; i < 10000000; i++) { producer.send(new ProducerRecord<String,String> ("test1",i+"\t"+random.nextInt(100)+"\t"+random.nextInt(3) +"\t"+System.currentTimeMillis())); } producer.flush(); producer.close(); } }
3)创建测试表
create table hadoop_prod.db.test_topic( uid bigint, courseid int, deviceid int, ts timestamp) using iceberg partitioned by(days(ts));
5.3 编写代码
基于 test1 的测试数据,编写结构化流代码,进行测试
package com.atguigu.iceberg.spark.structuredstreaming import java.sql.Timestamp import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, SparkSession} object TestTopicOperators { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.hadoop_prod.type", "hadoop") .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/hive/warehouse") .set("spark.sql.catalog.catalog-name.type", "hadoop") .set("spark.sql.catalog.catalog-name.default-namespace", "default") .set("spark.sql.sources.partitionOverwriteMode", "dynamic") .set("spark.sql.session.timeZone", "GMT+8") .set("spark.sql.shuffle.partitions", "12") //.setMaster("local[*]") .setAppName("test_topic") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() val df = sparkSession.readStream.format("kafka") .option("kafka.bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092") .option("subscribe", "test1") .option("startingOffsets", "earliest") .option("maxOffsetsPerTrigger", "10000").load() import sparkSession.implicits._ val query = df.selectExpr("cast (value as string)").as[String] .map(item => { val array = item.split("\t") val uid = array(0) val courseid = array(1) val deviceid = array(2) val ts = array(3) Test1(uid.toLong, courseid.toInt, deviceid.toInt, new Timestamp(ts.toLong)) }).writeStream.foreachBatch { (batchDF: Dataset[Test1], batchid: Long) => batchDF.writeTo("hadoop_prod.db.test_topic").overwritePartitions() }.option("checkpointLocation", "/ss/checkpoint") .start() query.awaitTermination() } case class Test1(uid: BigInt, courseid: Int, deviceid: Int, ts: Timestamp)
5.4 提交 yarn 测试速度
1)打成 jar 包,上传到集群,运行代码跑 yarn 模式 让 vcore 个数和 shuffle 分区数保持1:1 最高效运行
[root@hadoop103 lizunting]# spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --queue spark --class com.atguigu.iceberg.spark.structuredstreaming.TestTopicOperators iceberg-spark-demo-1.0-SNAPSHOT-jar-with-dependencies.jar
2)运行起来后,查看 Spark Web UI 界面监控速度。趋于稳定后,可以看到速度能到每秒10200,条左右,已经达到了我参数所设置的上限。当然分区数(kafka 分区和 shuffle 分区) 和 vcore 越多实时性也会越高目前测试是 12 分区。
3)实时性没问题,但是有一个缺点,没有像 hudi 一样解决小文件问题。解决过多文件数可以更改 trigger 触发时间,但也会影响实时效率,两者中和考虑使用。
4)最后是花了 18 分钟跑完 1000 万条数据,查询表数据观察是否有数据丢失。数据没有丢失。
第 6 章存在的问题和缺点
6.1问题
- 时区无法设置
- Spark Sql 黑窗口,缓存无法更新,修改表数据后,得需要关了黑窗口再重新打开,查询才是更新后的数据
- 表分区如果指定多个分区或分桶,那么插入批量数据时,如果这一批数据有多条数据在同一个分区会报错
6.2缺点
- 与 hudi 相比,没有解决小文件问题
- 与 hudi 相比,缺少行级更新,只能对表的数据按分区进行 overwrite 全量覆盖
第 7 章 Flink 操作
7.1配置参数和 jar 包
/opt/module/iceberg-apache-iceberg-0.11.1/flink-runtime/build/libs/ [root@hadoop103 libs]# cp *.jar /opt/module/flink-1.11.0/lib/
root@hadoop103 flink-1.11.0]# vim bin/config.sh export HADOOP_COMMON_HOME=/opt/module/hadoop-3.1.3 export HADOOP_HDFS_HOME=/opt/module/hadoop-3.1.3 export HADOOP_YARN_HOME=/opt/module/hadoop-3.1.3 export HADOOP_MAPRED_HOME=/opt/module/hadoop-3.1.3 export HADOOP_CLASSPATH=`hadoop classpath` export PATH=$PATH:$HADOOP_CLASSPATH
1)Flink1.11 开始就不在提供 flink-shaded-hadoop-2-uber 的支持,所以如果需要 flink 支hadoop 得配置环境变量 HADOOP_CLASSPATH
2)目前 Iceberg 只支持 flink1.11.x 的版本,所以我这使用 flink1.11.0,将构建好的 Iceberg的 jar 包复制到 flink 下
7.2 Flink SQL Client
[root@hadoop103 ~]# cd /opt/module/flink-1.11.0/ [root@hadoop103 flink-1.11.0]# bin/start-cluster.sh
1)在 hadoop 环境下,启动一个单独的 flink 集群
2)启动 flin sql client
[root@hadoop103 flink-1.11.0]# bin/sql-client.sh embedded shell
7.3 使用 Catalogs 创建目录
1) flink 可以通过 sql client 来创建 catalogs 目录, 支持的方式有 hive catalog,hadoop catalog,custom catlog。我这里采用 hadoop catlog。
CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://mycluster/flink/warehouse/', 'property-version'='1' );
2)使用当前 catalog
3)创建 sql-client-defaults.yaml,方便以后启动 flink-sql 客户端,走 iceberg 目录
Flink SQL> exit; [root@hadoop103 flink-1.11.0]# cd conf/ [root@hadoop103 conf]# vim sql-client-defaults.yaml catalogs: - name: hadoop_catalog type: iceberg catalog-type: hadoop warehouse: hdfs://mycluster/flink/warehouse/
7.4 Flink SQL 操作
7.4.1建库
[root@hadoop103 flink-1.11.0]# bin/sql-client.sh embedded shell
1)再次启动 Flink SQL客户端
2)可以使用默认数据库,也可以创建数据库
Flink SQL> CREATE DATABASE iceberg_db; Flink SQL> show databases;
3)使用 iceberg 数据库
7.4.2建表(flink 不支持隐藏分区)
建表,我这里直接创建分区表了,使用 flink 对接 iceberg 不能使用 iceberg 的隐藏分区这一特性,目前还不支持。
7.4.3 like建表
可以使用create table 表名 like 的 sql 语句创建表结构完全一样的表
7.4.4 insert into
Flink SQL> insert into iceberg.testA values(1001,' 张三',18,'2021-07-01'),(1001,' 李四 ',19,'2021-07-02');
7.4.5查询
7.4.6任务监控
1) 可 查 看 hadoop103 默 认 端 口 8081 查 看 standlone 模 式 任 务 是 否 成 功
2)插入数据后,同样 hdfs 路径上也是有对应目录和数据块
7.4.7insert overwrite
1)使用 overwrite 插入
Flink SQL> insert overwrite iceberg.testA values(1,' 王 五 ',18,'2021-07-01'),(2,' 马 六 ',19,'2021-07-02');
2)flink 默认使用流的方式插入数据,这个时候流的插入是不支持 overwrite 操作的
3)需要将插入模式进行修改,改成批的插入方式,再次使用 overwrite 插入数据。如需要改回流式操作参数设置为 SET execution.type = streaming ;
4)查询结果,已经将结果根据分区进行覆盖操作
第 8 章 Flink API 操作
8.1配置 pom.xml
(1)配置相关依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>iceberg-demo</artifactId> <groupId>com.atguigu.iceberg</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>icberg-flink-demo</artifactId> <properties> <flink.version>1.11.0</flink.version> <scala.version>2.12.10</scala.version> <scala.binary.version>2.12</scala.binary.version> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.22</slf4j.version> <iceberg.version>0.11.1</iceberg.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-blink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime</artifactId> <version>0.11.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> </project>
8.2.1读取表数据
8.2.1.1batch read
package com.atguigu.iceberg.flink.sql; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.source.FlinkSource; public class TableOperations { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testA"); batchRead(env, tableLoader); env.execute(); } public static void batchRead(StreamExecutionEnvironment env, TableLoader tableLoader){ DataStream<RowData> batch = FlinkSource.forRowData().env(env). tableLoader(tableLoader).streaming(false).build(); batch.map(item -> item.getInt(0)+"\t"+item.getString(1)+"\t"+item.getInt(2) +"\t"+item.getString(3)).prin t(); } }
8.2.1.2streamingread
public static void streamingRead(StreamExecutionEnvironment env, TableLoader tableLoader){ DataStream<RowData> stream = FlinkSource.forRowData().env(env). tableLoader(tableLoader).streaming(true).build(); stream.print(); }
1)通过 streaming 的方式去读取数据
2)启动之后程序不会立马停止
3)因为是流处理,这个时候手动往表中追加一条数据
Flink SQL> insert into iceberg.testA values(3,'哈哈哈',18,'2021-07-01');
可以看到控制台,实时打印出了数据
8.3写数据
8.3.1Appending Data
public static void appendingData(StreamExecutionEnvironment env,TableLoader tableLoader){ DataStream<RowData> batch =. FlinkSource.forRowData().env(env). tableLoader(tableLoader).streaming(false).build(); TableLoader tableB = TableLoader.fromHadoopTable("hdfs://mycluster/flink/warehouse/iceberg/testB"); FlinkSink.forRowData(batch).tableLoader(tableB).build(); }
使用上面 create table testB like testA 的 testB 表,读取 A 表数据插入到 B 表数据采用的是 batch 批处理,代码执行两次并查询查看 append 效果
8.3.3OverwriteData
1)编写代码,将 overwrite 设置为 true
public static void overtData(StreamExecutionEnvironment env,TableLoader tableLoader){ DataStream<RowData> batch = FlinkSource.forRowData().env(env). tableLoader(tableLoader).streaming(false).build(); TableLoader tableB = TableLoader.fromHadoopTable( "hdfs://mycluster/flink/warehouse/iceberg/testB"); FlinkSink.forRowData(batch). tableLoader(tableB).overwrite(true).build(); }
2)查询 testB 表查看 overwrite 效果,根据分区将数据进行了覆盖操作
8.4模拟数仓
第 9 章 Flink 存在的问题
- Flink 不支持 Iceberg 隐藏分区
- 不支持通过计算列创建表
- 不支持创建带水位线的表
- 不支持添加列、删除列、重命名列