流数据湖平台Apache Paimon(五)集成 Spark 引擎

简介: 流数据湖平台Apache Paimon(五)集成 Spark 引擎

第4章 集成 Spark 引擎

4.1 环境准备

Paimon 目前支持 Spark 3.4、3.3、3.2 和 3.1。课程使用的Spark版本是3.3.1。

1)上传并解压Spark安装包

tar -zxvf spark-3.3.1-bin-hadoop3.tgz -C /opt/module/

mv /opt/module/spark-3.3.1-bin-hadoop3 /opt/module/spark-3.3.1

2)配置环境变量

sudo vim /etc/profile.d/my_env.sh

export SPARK_HOME=/opt/module/spark-3.3.1
export PATH=$PATH:$SPARK_HOME/bin

source /etc/profile.d/my_env.sh

3)拷贝paimon的jar包到Spark的jars目录

拷贝jar报到spark的jars目录(也可以运行时 --jars)

下载地址:https://repository.apache.org/snapshots/org/apache/paimon/paimon-spark-3.3/0.5-SNAPSHOT/

cp paimon-spark-3.3-0.5-20230703.002437-65.jar /opt/module/spark/jars

4.2 Catalog

启动spark-sql时,指定Catalog。切换到catalog后,Spark现有的表将无法直接访问,可以使用spark_catalog.d a t a b a s e n a m e . {database_name}.databasename.{table_name}来访问Spark表。

注册catalog可以启动时指定,也可以配置在spark-defaults.conf中

4.2.1 文件系统

spark-sql \

–conf spark.sql.catalog.fs=org.apache.paimon.spark.SparkCatalog \

–conf spark.sql.catalog.fs.warehouse=hdfs://hadoop102:8020/spark/paimon/fs

其中,参数前缀为:spark.sql.catalog.

USE fs.default;

4.2.2 Hive

1)启动hive的metastore服务

nohup hive --service metastore &

2)启动时注册Catalog

spark-sql \
  --conf spark.sql.catalog.hive=org.apache.paimon.spark.SparkCatalog \
  --conf spark.sql.catalog.hive.warehouse=hdfs://hadoop102:8020/spark/paimon/hive \
  --conf spark.sql.catalog.hive.metastore=hive \
  --conf spark.sql.catalog.hive.uri=thrift://hadoop102:9083

切换到该catalog下的default数据库:

USE hive.default;

3)禁用 Hive ACID(Hive3)

hive.strict.managed.tables=false
hive.create.as.insert.only=false
metastore.create.as.acid=false

使用hive Catalog通过alter table更改不兼容的列类型时,参见 HIVE-17832。需要配置

hive.metastore.disallow.inknown.col.type.changes=false

4.3 DDL

4.3.1 建表

4.3.1.1 管理表

在 Paimon Catalog中创建的表就是Paimon的管理表,由Catalog管理。当表从Catalog中删除时,其表文件也将被删除,类似于Hive的内部表。

1)创建表

CREATE TABLE tests (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  dt STRING,
  hh STRING
) TBLPROPERTIES (
  'primary-key' = 'dt,hh,user_id'
);

2)创建分区表

CREATE TABLE tests_p (
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  dt STRING,
  hh STRING
) PARTITIONED BY (dt, hh) 
TBLPROPERTIES (
  'primary-key' = 'dt,hh,user_id'
);

通过配置partition.expiration-time,可以自动删除过期的分区。

如果定义了主键,则分区字段必须是主键的子集。

可以定义以下三类字段为分区字段:

创建时间(推荐):创建时间通常是不可变的,因此您可以放心地将其视为分区字段并将其添加到主键中。

事件时间:事件时间是原表中的一个字段。对于CDC数据来说,比如从MySQL CDC同步的表或者Paimon生成的Changelogs,它们都是完整的CDC数据,包括UPDATE_BEFORE记录,即使你声明了包含分区字段的主键,也能达到独特的效果。

CDC op_ts:不能定义为分区字段,无法知道之前的记录时间戳。

3)Create Table As

表可以通过查询的结果创建和填充,例如,我们有一个这样的sql: CREATE TABLE table_b AS SELECT id, name FORM table_a, 生成的表table_b将相当于创建表并插入数据以下语句:CREATE TABLE table_b(id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a;

使用CREATE TABLE AS SELECT时我们可以指定主键或分区。

CREATE TABLE tests1(
  user_id BIGINT,
  item_id BIGINT
);
CREATE TABLE tests2 AS SELECT * FROM tests1;
-- 指定分区
CREATE TABLE tests2_p PARTITIONED BY (dt) AS SELECT * FROM tests_p;
-- 指定配置
CREATE TABLE tests3(
    user_id BIGINT,
    item_id BIGINT
) TBLPROPERTIES ('file.format' = 'orc');
CREATE TABLE tests3_op TBLPROPERTIES ('file.format' = 'parquet') AS SELECT * FROM tests3;
-- 指定主键
CREATE TABLE tests_pk TBLPROPERTIES ('primary-key' = 'dt') AS SELECT * FROM tests;
-- 指定主键和分区
CREATE TABLE tests_all PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM tests_p;

4)表属性

用户可以指定表属性来启用Paimon的功能或提高Paimon的性能。有关此类属性的完整列表,请参阅配置https://paimon.apache.org/docs/master/maintenance/configurations/

CREATE TABLE tbl(
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING,
  dt STRING,
  hh STRING
) PARTITIONED BY (dt, hh) 
TBLPROPERTIES (
  'primary-key' = 'dt,hh,user_id',
  'bucket' = '2',
  'bucket-key' = 'user_id'
);

4.3.1.2 外部表

外部表由Catalog记录但不管理。如果删除外部表,其表文件不会被删除,类似于Hive的外部表。

Paimon 外部表可以在任何Catalog中使用。如果您不想创建Paimon Catalog而只想读/写表,则可以考虑外部表。

Spark3仅支持通过Scala API创建外部表。以下 Scala 代码将位于 hdfs:///path/to/table 的表加载到 DataSet 中。

val dataset = spark.read.format(“paimon”).load(“hdfs:///path/to/table”)

4.3.2 修改表

4.3.2.1 修改表

1)更改/添加表属性

ALTER TABLE tests SET TBLPROPERTIES (
  'write-buffer-size' = '256 MB'
);

2)重命名表名称

ALTER TABLE tests1 RENAME TO tests_new;

3)删除表属性

ALTER TABLE tests UNSET TBLPROPERTIES ('write-buffer-size');

4.3.2.2 修改列

1)添加新列

ALTER TABLE tests ADD COLUMNS (c1 INT, c2 STRING);

2)重命名列名称

ALTER TABLE tests RENAME COLUMN c1 TO c0;

3)删除列

ALTER TABLE my_table DROP COLUMNS(c0, c2);

4)更改列的可为空性

CREATE TABLE tests_null(
id INT, 
coupon_info FLOAT NOT NULL
);
-- Spark只支持将not null改为 nullable
ALTER TABLE tests_null ALTER COLUMN coupon_info DROP NOT NULL;

5)更改列注释

ALTER TABLE tests ALTER COLUMN user_id COMMENT 'user id'

6)添加列位置

ALTER TABLE tests ADD COLUMN a INT FIRST;

ALTER TABLE tests ADD COLUMN b INT AFTER a;

注意:这种操作在hive中是不允许的,使用hive catalog无法执行,需要关闭hive的参数限制:

vim /opt/module/hive/conf/hive-site.xml;

<property>
    <name>hive.metastore.disallow.incompatible.col.type.changes</name>
    <value>false</value>
  </property>

重启hive metastore服务。

7)更改列位置

ALTER TABLE tests ALTER COLUMN b FIRST;
ALTER TABLE tests ALTER COLUMN a AFTER user_id;

8)更改列类型

ALTER TABLE tests ALTER COLUMN a TYPE DOUBLE;


目录
相关文章
|
1天前
|
存储 SQL 监控
计算效率提升 10 倍,存储成本降低 60%,灵犀科技基于 Apache Doris 建设统一数据服务平台
灵犀科技早期基于 Hadoop 构建大数据平台,在战略调整和需求的持续扩增下,数据处理效率、查询性能、资源成本问题随之出现。为此,引入 [Apache Doris](https://doris.apache.org/) 替换了复杂技术栈,升级为集存储、加工、服务为一体的统一架构,实现存储成本下降 60%,计算效率提升超 10 倍的显著成效。
计算效率提升 10 倍,存储成本降低 60%,灵犀科技基于 Apache Doris 建设统一数据服务平台
|
4天前
|
SQL 存储 分布式计算
Paimon助力数据湖仓架构实时化升级
本次分享由阿里云高级技术专家李劲松介绍Paimon助力数据湖仓架构实时化升级。内容涵盖四个部分:1) 数据架构的存储演进,介绍Data LakeHouse结合的优势;2) Paimon实时数据湖,强调其批流一体和高效处理能力;3) 数据湖的实时流式处理,展示Paimon在时效性提升上的应用;4) 数据湖非结构化处理,介绍Paimon对非结构化数据的支持及AI集成。Paimon通过优化存储格式和引入LSM技术,实现了更高效的实时数据处理和查询性能,广泛应用于阿里巴巴内部及各大公司,未来将进一步支持AI相关功能。
|
2月前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
3月前
|
存储 数据挖掘 数据处理
Apache Paimon 是一款高性能的数据湖框架,支持流式和批处理,适用于实时数据分析
【10月更文挑战第8天】随着数据湖技术的发展,越来越多企业开始利用这一技术优化数据处理。Apache Paimon 是一款高性能的数据湖框架,支持流式和批处理,适用于实时数据分析。本文分享了巴别时代在构建基于 Paimon 的 Streaming Lakehouse 的探索和实践经验,包括示例代码和实际应用中的优势与挑战。
135 1
|
5月前
|
存储 数据挖掘 数据处理
【破晓数据湖新时代!】巴别时代揭秘:Apache Paimon 打造 Streaming Lakehouse 的神奇之旅!
【8月更文挑战第9天】随着数据湖技术的发展,企业积极探索优化数据处理的新途径。Apache Paimon 作为一款高性能数据湖框架,支持流式与批处理,适用于实时数据分析。本文分享巴别时代使用 Paimon 构建 Streaming Lakehouse 的实践经验。Paimon 统一了数据存储与查询方式,对构建实时数据管道极具价值。
244 3
|
5月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
流计算引擎数据问题之Apache Kafka Streams 没有采用低水印方案如何解决
63 0
|
5月前
|
消息中间件 Kafka Apache
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
流计算引擎数据问题之Apache Flink 的完整性推理方案设计如何解决
69 0
|
6月前
|
SQL 数据处理 Apache
Apache Flink SQL:实时计算的核心引擎
Apache Flink SQL 的一些核心功能,并探讨了其在实时计算领域的应用。随着 Flink 社区的不断发展和完善,Flink SQL 将变得越来越强大,为实时数据分析带来更多的可能性。
|
3月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
2月前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
59 0

推荐镜像

更多