第4章 集成 Spark 引擎
4.1 环境准备
Paimon 目前支持 Spark 3.4、3.3、3.2 和 3.1。课程使用的Spark版本是3.3.1。
1)上传并解压Spark安装包
tar -zxvf spark-3.3.1-bin-hadoop3.tgz -C /opt/module/
mv /opt/module/spark-3.3.1-bin-hadoop3 /opt/module/spark-3.3.1
2)配置环境变量
sudo vim /etc/profile.d/my_env.sh
export SPARK_HOME=/opt/module/spark-3.3.1 export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile.d/my_env.sh
3)拷贝paimon的jar包到Spark的jars目录
拷贝jar报到spark的jars目录(也可以运行时 --jars)
下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.3/0.5-SNAPSHOT/
cp paimon-spark-3.3-0.5-20230703.002437-65.jar /opt/module/spark/jars
4.2 Catalog
启动spark-sql时,指定Catalog。切换到catalog后,Spark现有的表将无法直接访问,可以使用spark_catalog.d a t a b a s e n a m e . {database_name}.databasename.{table_name}来访问Spark表。
注册catalog可以启动时指定,也可以配置在spark-defaults.conf中
4.2.1 文件系统
spark-sql \
–conf spark.sql.catalog.fs=org.apache.paimon.spark.SparkCatalog \
–conf spark.sql.catalog.fs.warehouse=hdfs://hadoop102:8020/spark/paimon/fs
其中,参数前缀为:spark.sql.catalog.
USE fs.default;
4.2.2 Hive
1)启动hive的metastore服务
nohup hive --service metastore &
2)启动时注册Catalog
spark-sql \ --conf spark.sql.catalog.hive=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.hive.warehouse=hdfs://hadoop102:8020/spark/paimon/hive \ --conf spark.sql.catalog.hive.metastore=hive \ --conf spark.sql.catalog.hive.uri=thrift://hadoop102:9083
切换到该catalog下的default数据库:
USE hive.default;
3)禁用 Hive ACID(Hive3)
hive.strict.managed.tables=false hive.create.as.insert.only=false metastore.create.as.acid=false
使用hive Catalog通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置
hive.metastore.disallow.inknown.col.type.changes=false
4.3 DDL
4.3.1 建表
4.3.1.1 管理表
在 Paimon Catalog中创建的表就是Paimon的管理表,由Catalog管理。当表从Catalog中删除时,其表文件也将被删除,类似于Hive的内部表。
1)创建表
CREATE TABLE tests ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING ) TBLPROPERTIES ( 'primary-key' = 'dt,hh,user_id' );
2)创建分区表
CREATE TABLE tests_p ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING ) PARTITIONED BY (dt, hh) TBLPROPERTIES ( 'primary-key' = 'dt,hh,user_id' );
通过配置partition.expiration-time,可以自动删除过期的分区。
如果定义了主键,则分区字段必须是主键的子集。
可以定义以下三类字段为分区字段:
创建时间(推荐):创建时间通常是不可变的,因此您可以放心地将其视为分区字段并将其添加到主键中。
事件时间:事件时间是原表中的一个字段。对于CDC数据来说,比如从MySQL CDC同步的表或者Paimon生成的Changelogs,它们都是完整的CDC数据,包括UPDATE_BEFORE记录,即使你声明了包含分区字段的主键,也能达到独特的效果。
CDC op_ts:不能定义为分区字段,无法知道之前的记录时间戳。
3)Create Table As
表可以通过查询的结果创建和填充,例如,我们有一个这样的sql: CREATE TABLE table_b AS SELECT id, name FORM table_a, 生成的表table_b将相当于创建表并插入数据以下语句:CREATE TABLE table_b(id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;
使用CREATE TABLE AS SELECT时我们可以指定主键或分区。
CREATE TABLE tests1( user_id BIGINT, item_id BIGINT ); CREATE TABLE tests2 AS SELECT * FROM tests1; -- 指定分区 CREATE TABLE tests2_p PARTITIONED BY (dt) AS SELECT * FROM tests_p; -- 指定配置 CREATE TABLE tests3( user_id BIGINT, item_id BIGINT ) TBLPROPERTIES ('file.format' = 'orc'); CREATE TABLE tests3_op TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM tests3; -- 指定主键 CREATE TABLE tests_pk TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM tests; -- 指定主键和分区 CREATE TABLE tests_all PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM tests_p;
4)表属性
用户可以指定表属性来启用Paimon的功能或提高Paimon的性能。有关此类属性的完整列表,请参阅配置https://paimon.apache.org/docs/master/maintenance/configurations/。
CREATE TABLE tbl( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING, hh STRING ) PARTITIONED BY (dt, hh) TBLPROPERTIES ( 'primary-key' = 'dt,hh,user_id', 'bucket' = '2', 'bucket-key' = 'user_id' );
4.3.1.2 外部表
外部表由Catalog记录但不管理。如果删除外部表,其表文件不会被删除,类似于Hive的外部表。
Paimon 外部表可以在任何Catalog中使用。如果您不想创建Paimon Catalog而只想读/写表,则可以考虑外部表。
Spark3仅支持通过Scala API创建外部表。以下 Scala 代码将位于 hdfs:///path/to/table 的表加载到 DataSet 中。
val dataset = spark.read.format(“paimon”).load(“hdfs:///path/to/table”)
4.3.2 修改表
4.3.2.1 修改表
1)更改/添加表属性
ALTER TABLE tests SET TBLPROPERTIES ( 'write-buffer-size' = '256 MB' );
2)重命名表名称
ALTER TABLE tests1 RENAME TO tests_new;
3)删除表属性
ALTER TABLE tests UNSET TBLPROPERTIES ('write-buffer-size');
4.3.2.2 修改列
1)添加新列
ALTER TABLE tests ADD COLUMNS (c1 INT, c2 STRING);
2)重命名列名称
ALTER TABLE tests RENAME COLUMN c1 TO c0;
3)删除列
ALTER TABLE my_table DROP COLUMNS(c0, c2);
4)更改列的可为空性
CREATE TABLE tests_null( id INT, coupon_info FLOAT NOT NULL ); -- Spark只支持将not null改为 nullable ALTER TABLE tests_null ALTER COLUMN coupon_info DROP NOT NULL;
5)更改列注释
ALTER TABLE tests ALTER COLUMN user_id COMMENT 'user id'
6)添加列位置
ALTER TABLE tests ADD COLUMN a INT FIRST;
ALTER TABLE tests ADD COLUMN b INT AFTER a;
注意:这种操作在hive中是不允许的,使用hive catalog无法执行,需要关闭hive的参数限制:
vim /opt/module/hive/conf/hive-site.xml;
<property> <name>hive.metastore.disallow.incompatible.col.type.changes</name> <value>false</value> </property>
重启hive metastore服务。
7)更改列位置
ALTER TABLE tests ALTER COLUMN b FIRST; ALTER TABLE tests ALTER COLUMN a AFTER user_id;
8)更改列类型
ALTER TABLE tests ALTER COLUMN a TYPE DOUBLE;