开源湖仓一体平台(一):LakeSoul

本文涉及的产品
对象存储 OSS,20GB 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
简介: 开源湖仓一体平台(一):LakeSoul

LakeSoul 简介
Spark + Lakesoul CDC 入湖
LakeSoul Flink CDC 整库千表同步
数据更新 (Upsert) 和 Merge UDF 使用
多流合并构建宽表
Kafka 多 topic 数据入 LakeSoul

增删改查

 

lakeSoul简介

 

LakeSoul 是由数元灵科技研发的云原生湖仓一体框架,具备高可扩展的元数据管理、ACID 事务、高效灵活的 upsert 操作、Schema 演进和批流一体化处理等特性。

主要特性:

  • 弹性架构:计算存储完全分离,不需要固定节点和磁盘,计算存储各自弹性扩容。并且针对云存储做了大量优化,在对象存储上实现了并发一致性、增量更新等功能;使用 LakeSoul 不需要维护固定的存储节点,云上对象存储的成本只有本地磁盘的 1/10,极大地降低了存储成本和运维成本;
  • 高效可扩展的元数据管理:LakeSoul 使用 Postgres 数据库来管理文件元数据,可以高效的处理元数据的修改,并能够支持多并发写入,解决了 Hive 等元数据层的性能瓶颈,如长时间运行后元数据解析缓慢的痛点。元数据层的表结构经过精心设计,所有读写操作都能够使用主键索引,达到很高的 Ops。同时,元数据库在云上也能够很容易地进行扩容。
  • ACID 事务:通过元数据库事务机制实现了两阶段提交协议,保证了流批一体提交的事务性,用户不会看到不一致数据;支持多并发写入,自动冲突处理机制;
  • 多级分区模式和高效灵活的 upsert 操作:LakeSoul 支持 range 和 hash 分区,通过灵活的 upsert 功能,支持行、列级别的增、删、改等更新操作,将 upsert 数据以 delta file 的形式保存,大幅提高了写数据效率和并发性,而优化过的 merge scan 提供了高效的 MergeOnRead 读取性能;
  • 批流一体:LakeSoul 支持 streaming sink,可以同时处理流式数据摄入和历史数据批量回填、交互式查询等场景;
  • Schema 演进:支持新增、删除列,并在读取时自动兼容旧数据;
  • CDC 流、日志流自动同步:支持 MySQL 整库千表同步,自动建表和自动 Schema 变更;支持 Kafka 多 topic 合并同步、自动 Schema 解析、自动新 Topic 感知;
  • 云对象存储 IO 优化:使用 Rust Arrow 实现原生 Parquet IO,并对对象存储访问做了专门优化;


适用场景:

  • 构建实时湖仓,并且新增数据需要高效实时大批量写入,同时需要行、列级别的并发增量更新的场景;
  • 历史数据存储量很大,并且需要对大跨度时间范围做明细查询、修改,同时希望使用对象存储控制成本的场景;
  • 查询请求不固定,资源消耗变化较大,希望计算资源能够独立弹性伸缩的场景;
  • 需要多并发写,同时文件数量多,对元数据性能和并发有较高要求的场景;
  • 针对主键进行数据更新,对写吞吐有较高有求的场景;

 

spark + LakeSoul CDC入湖

 

使用 Spark Streaming,消费 Kafka 数据并同步更新至 LakeSoul

1:配置 LakeSoul 元数据库 

自 2.1.0 起,LakeSoul Spark 和 Flink 的 jar 包通过 shade 方式打包了 Postgres Driver,Driver 的名字是 com.lakesoul.shaded.org.postgresql.Driver,而在 2.0.1 版本之前,Driver 还没有 shaded 打包,名字是 org.postgresql.Driver

使用 LakeSoul 之前还需要初始化元数据表结构:

PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql

LakeSoul 使用 lakesoul_home (大小写均可) 环境变量或者 lakesoul_home JVM Property (只能全小写)来定位元数据库的配置文件,配置文件中主要包含 PostgreSQL DB 的连接信息。一个示例配置文件:

lakesoul.pg.driver=com.lakesoul.shaded.org.postgresql.Driver
lakesoul.pg.url=jdbc:postgresql://localhost:5432/lakesoul_test?stringtype=unspecified
lakesoul.pg.username=lakesoul_test
lakesoul.pg.password=lakesoul_test

如果找不到上述环境变量或 JVM Property,则会分别查找 LAKESOUL_PG_DRIVERLAKESOUL_PG_URLLAKESOUL_PG_USERNAMELAKESOUL_PG_PASSWORD 这几个环境变量作为配置的值。

2:设置 Spark 工程作业

LakeSoul 目前支持 Spark 3.1.2 + Scala 2.12

使用 spark-shellpyspark 或者 spark-sql 交互式查询, 需要添加 LakeSoul 的依赖和配置,有两种方法:

使用 --packages 传 Maven 仓库和包名

spark-shell --packages com.dmetasoul:lakesoul-spark:2.1.1-spark-3.1.2

使用打包好的 LakeSoul 包

可以从 Releases 页面下载已经打包好的 LakeSoul Jar 包。下载 jar 并传给 spark-submit 命令:

spark-submit --jars "lakesoul-spark-2.1.1-spark-3.1.2.jar"

将 Jar 包放在 Spark 环境中:将 Jar 包下载后,放在 $SPARK_HOME/jars 中

maven依赖

<dependency>
    <groupId>com.dmetasoul</groupId>
    <artifactId>lakesoul-spark</artifactId>
    <version>2.1.1-spark-3.1.2</version>
</dependency>

Spark 作业设置 lakesoul_home 环境变量

export lakesoul_home=/path/to/lakesoul.properties
  • 对于 Hadoop Yarn 集群, 增加命令行参数 --conf spark.yarn.appMasterEnv.lakesoul_home=lakesoul.properties --files /path/to/lakesoul.properties
  • 对于 K8s 集群,增加命令行参数 --conf spark.kubernetes.driverEnv.lakesoul_home=lakesoul.properties --files /path/to/lakesoul.properties to spark-submit command.

设置 Spark SQL Extension

LakeSoul 通过 Spark SQL Extension 机制来实现一些查询计划改写的扩展,需要为 Spark 作业增加以下配置:

spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension

设置 Spark 的 Catalog

LakeSoul 实现了 Spark 3 的 CatalogPlugin 接口,可以作为独立的 Catalog 插件让 Spark 加载。在 Spark 作业中增加如下配置:

spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog

该配置增加了一个名为 lakesoul 的 Catalog。为了方便 SQL 中使用,也可以将该 Catalog 设置为默认的 Catalog:

spark.sql.defaultCatalog=lakesoul

通过如上配置,默认会通过 LakeSoul Catalog 来查找所有 database 和表。如果需要同时访问 Hive 等外部 catalog,需要在表名前加上对应 catalog 名字。例如在 Spark 中启用 Hive 作为 Session Catalog,则访问 Hive 表时需要加上 spark_catalog 前缀。

spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog

从 2.1.0 起 LakeSoul 的 Catalog 更改为非 session 的实现。你仍然可以将 LakeSoul 设置为 Session Catalog,即设置名为 spark_catalog ,但是这样就无法再访问到 Hive 表。

自 2.0 版本起,LakeSoul 支持将 Compaction 后的目录路径,挂载到指定的 Hive 表,指定和 LakeSoul 分区名一致和自定义分区名两种功能。该功能可以方便下游一些只能支持访问 Hive 的系统读取到 LakeSoul 的数据。更推荐的方式是通过 Kyuubi 来支持 Hive JDBC,这样可以直接使用 Hive JDBC 调用 Spark 引擎来访问 LakeSoul 表,包括 Merge on Read 读取。

lakeSoulTable.compaction("date='2021-01-02'", "spark_catalog.default.hive_test_table", "date='20210102'")

3:启动SparkShell

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog

4:创建lakesoul表

我们创建一个 LakeSoul 表 MysqlCdcTest,这个表会准实时流批一体地同步 MySQL 的数据。这个表同样使用 id 列作为主键,用 "op" 列表示CDC的更新。并且我们需要通过 lakesoul_cdc_change_column 这个表属性,指定 LakeSoul 表中,表示 CDC 状态更新的列名,这个示例中该列的名字为 "op"。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val data=Seq((1L,1L,"hello world","insert")).toDF("id","rangeid","value","op")
LakeSoulTable.createTable(data, path).shortTableName("cdc").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "op").create()

5:启动 streaming 写入 LakeSoul读取 kafka,转换 Debezium 读取出的 json 格式,使用lakesoul upsert更新 LakeSoul 表:

import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val lakeSoulTable = LakeSoulTable.forPath(path)
var strList = List.empty[String]
//js1 是示例数据,我们这里也用于生成schema,在下文from_json函数中转换数据使用,before和after中内容对应于mysql表字段
val js1 = """{
        |  "before": {
        |    "id": 2,
        |    "rangeid": 2,
        |    "value": "sms"
        |  },
        |  "after": {
        |    "id": 2,
        |    "rangeid": 2,
        |    "value": "sms"
        |  },
        |  "source": {
        |    "version": "1.8.0.Final",
        |    "connector": "mysql",
        |    "name": "cdcserver",
        |    "ts_ms": 1644461444000,
        |    "snapshot": "false",
        |    "db": "cdc",
        |    "sequence": null,
        |    "table": "sms",
        |    "server_id": 529210004,
        |    "gtid": "de525a81-57f6-11ec-9b60-fa163e692542:1621099",
        |    "file": "binlog.000033",
        |    "pos": 54831329,
        |    "row": 0,
        |    "thread": null,
        |    "query": null
        |  },
        |  "op": "c",
        |  "ts_ms": 1644461444777,
        |  "transaction": null
        |}""".stripMargin
strList = strList :+ js1
val rddData = spark.sparkContext.parallelize(strList)
val resultDF = spark.read.json(rddData)
val sche = resultDF.schema
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
//对接kafka 需要指定kafka.bootstrap.server ip地址和debezium输出到kafka的topic
val kfdf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafkahost:9092")
.option("subscribe", "cdcserver.cdc.test")
.option("startingOffsets", "latest")
.load()
//解析debezium中产生的json,对于Mysql insert、update、delete操作会自动生成一列,列名op
val kfdfdata = kfdf
.selectExpr("CAST(value AS STRING) as value")
.withColumn("payload", from_json($"value", sche))
.filter("value is not null")
.drop("value")
.select("payload.after", "payload.before", "payload.op")
.withColumn(
    "op",
    when($"op" === "c", "insert")
    .when($"op" === "u", "update")
    .when($"op" === "d", "delete")
    .otherwise("unknown")
)
.withColumn(
    "data",
    when($"op" === "insert" || $"op" === "update", $"after")
    .when($"op" === "delete", $"before")
)
.drop($"after")
.drop($"before")
.select("data.*", "op")
//使用lakesoul upsert更新表中数据并在屏幕上输出解析后的数据
kfdfdata.writeStream
.foreachBatch { (batchDF: DataFrame, _: Long) =>
    {
    lakeSoulTable.upsert(batchDF)
    batchDF.show
    }
}
.start()
.awaitTermination()

flink cdc同步到LakeSoul

LakeSoul 自 2.1.0 版本起,实现了 Flink CDC Sink,能够支持 Table API 及 SQL (单表),以及 Stream API (整库多表)。目前支持的上游数据源为 MySQL(5.6-8.0)
1:下载 LakeSoul Flink Jar,lakesoul-flink-2.1.1-flink-1.14.jar

2:增加 LakeSoul 元数据库配置

$FLINK_HOME/conf/flink-conf.yaml 中增加如下配置:

containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.master.env.LAKESOUL_PG_USERNAME: root
containerized.master.env.LAKESOUL_PG_PASSWORD: root
containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://192.168.24.180:5432/test_lakesoul_meta?stringtype=unspecified
containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root
containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root
containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://192.168.24.180:5432/test_lakesoul_meta?stringtype=unspecified

注意这里 master 和 taskmanager 的环境变量都需要设置。

注意如果使用 Session 模式来启动作业,即将作业以 client 方式提交到 Flink Standalone Cluster,则 flink run 作为 client,是不会读取上面配置,因此需要再单独配置环境变量,即:

export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver
export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
export LAKESOUL_PG_USERNAME=root
export LAKESOUL_PG_PASSWORD=root

3:启动同步作业

bin/flink run -c org.apache.flink.lakesoul.entry.MysqlCdc \
    lakesoul-flink-2.1.1-flink-1.14.jar \
    --source_db.host localhost \
    --source_db.port 3306 \
    --source_db.db_name default \
    --source_db.user root \
    --source_db.password root \
    --source.parallelism 4 \
    --sink.parallelism 4 \
    --server_time_zone=Asia/Shanghai
    --warehouse_path s3://bucket/lakesoul/flink/data \
    --flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \
    --flink.savepoint s3://bucket/lakesoul/flink/savepoints

4: datastream代码

package org.apache.flink.lakesoul.entry;
import com.dmetasoul.lakesoul.meta.external.mysql.MysqlDBManager;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.lakesoul.sink.LakeSoulMultiTableSinkStreamBuilder;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.BinarySourceRecord;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashSet;
import java.util.List;
import static org.apache.flink.lakesoul.tool.JobOptions.*;
import static org.apache.flink.lakesoul.tool.LakeSoulDDLSinkOptions.*;
public class MysqlCdc2LakeSoul {
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        String dbName = parameter.get(SOURCE_DB_DB_NAME.key());
        String userName = parameter.get(SOURCE_DB_USER.key());
        String passWord = parameter.get(SOURCE_DB_PASSWORD.key());
        String host = parameter.get(SOURCE_DB_HOST.key());
        int port = parameter.getInt(SOURCE_DB_PORT.key(), MysqlDBManager.DEFAULT_MYSQL_PORT);
        String databasePrefixPath = parameter.get(WAREHOUSE_PATH.key());
        String serverTimezone = parameter.get(SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue());
        int sourceParallelism = parameter.getInt(SOURCE_PARALLELISM.key());
        int bucketParallelism = parameter.getInt(BUCKET_PARALLELISM.key());
        int checkpointInterval = parameter.getInt(JOB_CHECKPOINT_INTERVAL.key(),
                                                  JOB_CHECKPOINT_INTERVAL.defaultValue());     //mill second
        MysqlDBManager mysqlDBManager = new MysqlDBManager(dbName,
                                                           userName,
                                                           passWord,
                                                           host,
                                                           Integer.toString(port),
                                                           new HashSet<>(),
                                                           databasePrefixPath,
                                                           bucketParallelism,
                                                           true);
        mysqlDBManager.importOrSyncLakeSoulNamespace(dbName);
        //syncing mysql tables to lakesoul
        List<String> tableList = mysqlDBManager.listTables();
        if (tableList.isEmpty()) {
            throw new IllegalStateException("Failed to discover captured tables");
        }
        tableList.forEach(mysqlDBManager::importOrSyncLakeSoulTable);
        Configuration conf = new Configuration();
        // parameters for mutil tables ddl sink
        conf.set(SOURCE_DB_DB_NAME, dbName);
        conf.set(SOURCE_DB_USER, userName);
        conf.set(SOURCE_DB_PASSWORD, passWord);
        conf.set(SOURCE_DB_HOST, host);
        conf.set(SOURCE_DB_PORT, port);
        conf.set(WAREHOUSE_PATH, databasePrefixPath);
        conf.set(SERVER_TIME_ZONE, serverTimezone);
        // parameters for mutil tables dml sink
        conf.set(LakeSoulSinkOptions.USE_CDC, true);
        conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath);
        conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism);
        conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism);
        StreamExecutionEnvironment env;
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool pt = ParameterTool.fromMap(conf.toMap());
        env.getConfig().setGlobalJobParameters(pt);
        env.enableCheckpointing(checkpointInterval);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(4023);
        CheckpointingMode checkpointingMode = CheckpointingMode.EXACTLY_ONCE;
        if (parameter.get(JOB_CHECKPOINT_MODE.key(), JOB_CHECKPOINT_MODE.defaultValue()).equals("AT_LEAST_ONCE")) {
            checkpointingMode = CheckpointingMode.AT_LEAST_ONCE;
        }
        env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointStorage(parameter.get(FLINK_CHECKPOINT.key()));
        conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        MySqlSourceBuilder<BinarySourceRecord> sourceBuilder = MySqlSource.<BinarySourceRecord>builder()
                                                                        .hostname(host)
                                                                        .port(port)
                                                                        .databaseList(dbName) // set captured database
                                                                        .tableList(dbName + ".*") // set captured table
                                                                        .serverTimeZone(serverTimezone)  // default -- Asia/Shanghai
                                                                        .username(userName)
                                                                        .password(passWord);
        LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
        context.env = env;
        context.sourceBuilder = sourceBuilder;
        context.conf = conf;
        LakeSoulMultiTableSinkStreamBuilder builder = new LakeSoulMultiTableSinkStreamBuilder(context);
        DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource();
        Tuple2<DataStream<BinarySourceRecord>, DataStream<BinarySourceRecord>> streams =
                builder.buildCDCAndDDLStreamsFromSource(source);
        DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(streams.f0);
        DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
        DataStreamSink<BinarySourceRecord> ddlSink = builder.buildLakeSoulDDLSink(streams.f1);
        dmlSink.addSink(new LakeSoulMultiTablesSink())
        ddlSink.addSink(new LakeSoulDDLSink())
        env.execute("LakeSoul CDC Sink From MySQL Database " + dbName);
    }
}

5:LakeSoul Flink CDC Sink 严格一次语义保证

LakeSoul Flink CDC Sink 在作业运行过程中会自动保存相关状态,在 Flink 作业发生 Failover 时能够将状态恢复并重新写入,因此数据不会丢失。

LakeSoul 写入时,在两个部分保证写入的幂等性:

  • Stage 文件 Commit 时,与 Flink File Sink 一致,通过文件系统 rename 操作的原子性,来保证 staging 文件写入到最终的路径。因为 rename 是原子的,Failover 之后不会发生重复写入或缺失的情况。
  • LakeSoul 元数据提交时,会首先记录文件路径,在更新 snapshot 时会通过事务标记该文件已提交。Failover 后,通过判断一个文件是否已经提交,可以保证提交的幂等性。

综上,LakeSoul Flink CDC Sink 通过状态恢复保证数据不丢失,通过提交幂等性保证数据不重复,实现了严格一次(Exactly Once)语义保证。

 

数据更新和合并(upsert/merge)

LakeSoul可以支持对已经入湖的数据做部分字段更新功能,而不必将整张数据表全部覆盖重写,避免这种繁重且浪费资源的操作。

1:部分字段更新

举个例子一张表数据信息如下,id为主键(即hashPartitions),目前需要根据主键字段,对phone_number做字段修改处理。

可以使用 upsert 来实现对任意行中任意一个字段的更新。upsert需要包含主键 (id) 和需要修改的 address 信息,再次读取整张表数据 address 便可展示为修改后的字段信息。

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
import spark.implicits._
val df = Seq(("1", "Jake", "13700001111", "address_1", "job_1", "company_1"),("2", "Make", "13511110000", "address_2", "job_2", "company_2"))
  .toDF("id", "name", "phone_number", "address", "job", "company")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
df.write
  .mode("append")
  .format("lakesoul")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  .save(tablePath)
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
val extraDF = Seq(("1", "address_1_1")).toDF("id","address")
lakeSoulTable.upsert(extraDF)
lakeSoulTable.toDF.show()


2: 自定义Merge合并功能

LakeSoul 默认 merge 规则,即数据更新后取最后一条记录作为该字段数据 (org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.DefaultMergeOp)。在此基础上,LakeSoul 内置扩展了几种数据 merge 逻辑,对 Int/Long 字段做加和 merge(MergeOpInt/MergeOpLong)、对非空字段更新 (MergeNonNullOp)、以","拼接字符串 merge 方式。

下面以对非空字段更新 (MergeNonNullOp) 为例,借用上面表格数据样例。数据写入时同样以 upsert 方式进行更新写入,然后在数据读取时需要注册 merger 逻辑,然后进行读取即可。

import org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.MergeNonNullOp
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
import spark.implicits._
val df = Seq(("1", "Jake", "13700001111", "address_1", "job_1", "company_1"),("2", "Make", "13511110000", "address_2", "job_2", "company_2"))
  .toDF("id", "name", "phone_number", "address", "job", "company")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
df.write
  .mode("append")
  .format("lakesoul")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  .save(tablePath)
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
val extraDF = Seq(("1", "null", "13100001111", "address_1_1", "job_1_1", "company_1_1"),("2", "null", "13111110000", "address_2_2", "job_2_2", "company_2_2"))
  .toDF("id", "name", "phone_number", "address", "job", "company")
new MergeNonNullOp().register(spark, "NotNullOp")
lakeSoulTable.toDF.show()
lakeSoulTable.upsert(extraDF)
lakeSoulTable.toDF.withColumn("name", expr("NotNullOp(name)")).show()


用户也可以通过自定义 MergeOperator (实现 trait org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.MergeOperator) 来自定义 Merge 时的逻辑,能够灵活地实现数据高效入湖。

 

多流合并构建宽表

为构建宽表,传统数仓的 ETL 在做多表关联时,需要根据主外键多次 join,然后构建一个大宽表。当数据量较多或需要多次join时,会有效率低下,内存消耗大,容易 OOM 等问题,且 Shuffle 过程占据大部分数据交换时间,效率也很低下。LakeSoul 支持对数据进行 Upsert,并支持自定义 MergeOperator 功能,可以避免上述存在的问题,不必Join即可得到合并结果。下面针对这一场景具体举例进行说明。


假设有以下几个流的数据,A、B、C和D,各个流数据内容如下:

A:  

B:

 


C:

D:


最后需要形成一张大宽表,将四张表进行合并展示,如下:

传统意义上进行上述操作,需要将四张表根据主键(IP)进行三次join,写法如下:

Select 
       A.IP as IP,  
       A.sy as sy, 
       A.us as us, 
       B.free as free, 
       B.cache as cache, 
       C.level as level, 
       C.des as des, 
       D.qps as qps, 
       D.tps as tps 
from A join B on A.IP = B.IP 
    join C on C.IP = A.IP 
    join D on D.IP = A.IP.

LakeSoul 支持多流合并,多个流可以有不同的 Schema (需要有相同主键)。LakeSoul 可以做到自动扩展 Schema,若新写入的数据字段在原表中未存在,则会自动扩展表 schema,不存在的字段默认为null处理。通过使用 LakeSoul 多流合并功能,结合 LakeSoul 独特的 MergeOperator 功能,通过 upsert 将数据写入 LakeSoul 后,不需要 join,即可读取到拼接好的宽表。上述过程代码实现如下:

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .config("spark.dmetasoul.lakesoul.schema.autoMerge.enabled", "true")
  .getOrCreate()
import spark.implicits._
val df1 = Seq(("1.1.1.1", 30, 40)).toDF("IP", "sy", "us")
val df2 = Seq(("1.1.1.1", 1677, 455)).toDF("IP", "free", "cache")
val df3 = Seq(("1.1.1.2", "error", "killed")).toDF("IP", "level", "des")
val df4 = Seq(("1.1.1.1", 30, 40)).toDF("IP", "qps", "tps")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
df1.write
  .mode("append")
  .format("lakesoul")
  .option("hashPartitions","IP")
  .option("hashBucketNum","2")
  .save(tablePath)
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
lakeSoulTable.upsert(df2)
lakeSoulTable.upsert(df3)
lakeSoulTable.upsert(df4)
lakeSoulTable.toDF.show()


 

kafka多topic入LakeSoul

 

通过 LakeSul Kafka Stream 将 Kafka 中的数据同步到 LakeSul 非常方便。

LakeSoul Kafka Stream 可以支持自动创建表,自动识别新 topic,exactly-once 语义、自动为表添加分区等功能。

LakeSoul Kafka Stream 主要使用 Spark Structured Streaming 来实现数据同步功能。

使用 LakeSoul Kafka Stream需要以下条件之一:

       1:topic 中的数据为 json 格式;

       2:Kafka 集群带有 Schema Registry 服务

1. 准备环境

你可以编译 LakeSoul 项目以获取 LakeSoul Kafka Stream jar, 或者可以通过 https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-kafka-stream.tar.gz 来获取 LakeSoul Kafka Stream 以及其他任务运行依赖的jar包。

下载后解压 tar 包,然后将 jar 包放入 $SPARK_HOME/jars 目录下,或者在提交任务时添加依赖的jar,比如通过 --jars。

2. 启动 LakeSoul Kafka Stream 任务

  1. 任务启动时通过 lakesoul_home 环境变量添加元数据库信息. 这部分请参考 搭建本地测试环境
  2. 提交任务。你需要按顺序填写一些参数,以确保任务能够准确运行。参数描述如下:


3. 任务流程示例

1. 假设 Kafka 集群已经存在。在这里,通过 Docker Compose 运行 Kafka 集群。然后创建一个名为 "test" 的主题并向其中写入一些数据。Kafka bootstrap.servers: localhost:9092

# 创建 topic 'test'
bin# ./kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
# 查看 topic 列表
bin# ./kafka-topics.sh  --list --bootstrap-server localhost:9092
test
# 向 名为 'test' 的topic中写入一些数据
bin# ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>{"before":{"id":1,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":1}
>{"before":{"id":2,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":2}
>{"before":{"id":3,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":3}
>{"before":{"id":4,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":4}
>{"before":{"id":5,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":5}

2. 提交 Kafka Stream 任务. 将上述下载的依赖 jars 放到 $SPARK_HOME/jars 目录下.

export lakesoul_home=./pg.properties && ./bin/spark-submit \
--class org.apache.spark.sql.lakesoul.kafka.KafkaStream \
--driver-memory 4g \
--executor-memory 4g \
--master local[4] \
./jars/lakesoul-spark-2.1.1-spark-3.1.2-SNAPSHOT.jar \
localhost:9092 test.* /tmp/kafka/data /tmp/kafka/checkpoint/ kafka earliest false

3. 通过 spark-shell 查看写入 LakeSoul 中的数据

scala> import com.dmetasoul.lakesoul.tables.LakeSoulTable
import com.dmetasoul.lakesoul.tables.LakeSoulTable
scala> val tablepath="/tmp/kafka/data/kafka/test"
tablepath: String = /tmp/kafka/data/kafka/test
scala> val lake = LakeSoulTable.forPath(tablepath)
lake: com.dmetasoul.lakesoul.tables.LakeSoulTable = com.dmetasoul.lakesoul.tables.LakeSoulTable@585a2ad9
scala> lake.toDF.show(false)
+----------------------------------+----------------------------------+---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|after                             |before                            |op |source                                                                                                                                                                                                                                                                                                 |transaction|ts_ms        |
+----------------------------------+----------------------------------+---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|{"id":2,"rangeid":2,"value":"sms"}|{"id":1,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|1          |1644461444777|
|{"id":2,"rangeid":2,"value":"sms"}|{"id":3,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|3          |1644461444777|
|{"id":2,"rangeid":2,"value":"sms"}|{"id":5,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|5          |1644461444777|
|{"id":2,"rangeid":2,"value":"sms"}|{"id":2,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|2          |1644461444777|
|{"id":2,"rangeid":2,"value":"sms"}|{"id":4,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|4          |1644461444777|
+----------------------------------+----------------------------------+---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+

4. 创建新的 topic 'test_1' 并向其中写入一些数据

# 创建 topic 'test_1' 
bin# ./kafka-topics.sh --create --topic test_1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
# 查看 topic list
bin# ./kafka-topics.sh  --list --bootstrap-server localhost:9092
test
test_1
# 向 topic 'test_1' 中写入一些数据
bin# ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_1
>{"before":{"id":1,"rangeid":2,"value":"sms"},"after":{"id":1,"rangeid":1,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":1}
>{"before":{"id":2,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":2}
>{"before":{"id":3,"rangeid":2,"value":"sms"},"after":{"id":3,"rangeid":3,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":3}
>{"before":{"id":4,"rangeid":2,"value":"sms"},"after":{"id":4,"rangeid":4,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":4}
>{"before":{"id":5,"rangeid":2,"value":"sms"},"after":{"id":5,"rangeid":5,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":5}

5. 创建新的 topic 后,数据同步到 Lakesoul 需要5分钟时间。5分钟后查看 LakeSoul 中的数据

scala> val tablepath_1="/tmp/kafka/data/kafka/test_1"
tablepath_1: String = /tmp/kafka/data/kafka/test_1
scala> val lake_1 = LakeSoulTable.forPath(tablepath_1)
lake: com.dmetasoul.lakesoul.tables.LakeSoulTable = com.dmetasoul.lakesoul.tables.LakeSoulTable@43900101
lake_1.toDF.show(false)
+----------------------------------+----------------------------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|after                             |before                            |op  |source                                                                                                                                                                                                                                                                                                 |transaction|ts_ms        |
+----------------------------------+----------------------------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|{"id":2,"rangeid":2,"value":"sms"}|{"id":2,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|2          |1644461444777|
|{"id":1,"rangeid":1,"value":"sms"}|{"id":1,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|1          |1644461444777|
|{"id":4,"rangeid":4,"value":"sms"}|{"id":4,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|4          |1644461444777|
|{"id":3,"rangeid":3,"value":"sms"}|{"id":3,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|3          |1644461444777|
|{"id":5,"rangeid":5,"value":"sms"}|{"id":5,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|5          |1644461444777|
+----------------------------------+----------------------------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+

6. 如果 Kafka 集群使用 Schema Registry服务,提交任务时参数最后需要填写 schema registry 服务地址。

export lakesoul_home=./pg.properties && ./bin/spark-submit \
--class org.apache.spark.sql.lakesoul.kafka.KafkaStream \
--driver-memory 4g \
--executor-memory 4g \
--master local[4] \
./jars/lakesoul-spark-2.1.1-spark-3.1.2-SNAPSHOT.jar \
localhost:9092 test.* /tmp/kafka/data /tmp/kafka/checkpoint/ kafka earliest false http://localhost:8081

增删改查

LakeSoul共支持四种commit操作:mergeCommit;appendCommit;compactCommit;updateCommit,对于update操作由于历史数据每次合并并生成新文件,难以获取增量文件,因此不支持增量查询。

1.分区信息
option(LakeSoulOptions.PARTITION_DESC, "range=range1")
如果未指定分区信息,则默认针对所有分区进行增量查询。
2.起始和结束时间戳
option(LakeSoulOptions.READ_START_TIME, "2022-01-01 15:15:15")
option(LakeSoulOptions.READ_END_TIME, "2022-01-01 20:15:15")
3.读类型
option(LakeSoulOptions.READ_TYPE, "incremental")
可以指定增量读"incremental",快照读"snapshot",不指定默认全量读。

增量读支持简单的upsert场景和CDC场景下的增量读,有两种方式,一种是通过调用LakeSoulTable.forPath()函数进行查询,另一种是通过spark.read指定选项进行增量读,可以获得指定分区在起止时间范围内的增量数据,获取的增量数据时间区间为前闭后开。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
// 针对给定range和时间点,进行增量读取,incremental表示增量读
// 读取range1分区在2023-01-01 15:15:15到2023-01-01 20:15:15时间范围内的增量数据
// 第一种方式,通过forPath进行增量读
val lake1 = LakeSoulTable.forPath(tablePath, "range=range1", "2023-01-01 15:15:15", "2023-01-01 20:15:15", "incremental")
val data1 = lake1.toDF.select("range", "hash", "op").toDF().show()
// 第二种方式,通过spark.read指定选项进行增量读
val lake2 = spark.read.format("lakesoul")
  .option(LakeSoulOptions.PARTITION_DESC, "range=range1")
  .option(LakeSoulOptions.READ_START_TIME, "2023-01-01 15:15:15")
  .option(LakeSoulOptions.READ_END_TIME, "2023-01-01 20:15:15")
  .option(LakeSoulOptions.READ_TYPE, "incremental")
  .load(tablePath)
val data2 = lake2.toDF.select("range", "hash", "op").toDF().show()

流式读LakeSoul支持 Spark Structured Streaming read,流式读基于增量查询,通过spark.readStream指定选项进行流式读,可以获得实时数据流中指定分区下每一批次更新的增量数据。指定的起始时间需要早于实时数据的摄入时间。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
// 通过spark.readStream指定选项进行流式读,读取range1分区在2023-01-01 15:15:15及之后的增量数据,每1秒触发一次读取,将结果输出到控制台
spark.readStream.format("lakesoul")
  .option(LakeSoulOptions.PARTITION_DESC, "range=range1")
  .option(LakeSoulOptions.READ_START_TIME, "2022-01-01 15:15:15")
  .load(tablePath)
  .writeStream.format("console")
  .trigger(Trigger.ProcessingTime(1000))
  .start()
  .awaitTermination()

创建和写入LakeSoulTable

1:Table Name

LakeSoul 中表名可以是一个路径,数据存储的目录就是 LakeSoulTable 的表名。同时一个表可以有一个表名帮助记忆,或在SQL中访问,即不是路径形式的一个字符串。

当调用 Dataframe.write.save 方法向 LakeSoulTable 写数据时,若表不存在,则会使用存储路径自动创建新表,但是默认没有表名,只能通过路径访问,可以通过添加 option("shortTableName", "table_name") 选项来设置表名。

通过 DataFrame.write.saveAsTable,会创建表,可以通过表名访问,路径默认为 spark.sql.warehouse.dir/current_database/table_name,后续可以通过路径或表名访问。如需自定义表路径,则可以加上 option("path", "s3://bucket/...") 选项。

通过 SQL 建表时,表名可以是路径或一个表名,路径必须是绝对路径。如果是表名,则路径的规则和上面 Dataframe.write.saveAsTable 一致,可以在 CREATE TABLE SQL 中通过 LOCATION 子句设置。

2:Partition

LakeSoulTable 有两种分区方式,分别是 range 分区和 hash 分区,可以两种分区同时使用。

  • range 分区即通常的基于时间的表分区,不同分区的数据文件存储在不同的分区路径下;
  • 使用 hash 分区,必须同时指定 hash 分区主键字段和 hash bucket num,在写数据时,会根据 bucket num 对 hash 主键字段值进行散列,取模后相同数据会写到同一个文件,文件内部根据 hash 字段值升序排列;
  • 若同时指定了 range 分区和 hash 分区,则每个 range 分区内,hash 值相同的数据会写到同一个文件里;
  • 指定分区后,写入 LakeSoulTable 的数据必须包含分区字段。

可以根据具体场景选择使用 range 分区或 hash 分区,或者同时使用两者。当指定 hash 分区后,LakeSoulTable 的数据将根据主键唯一,主键字段为 hash 分区字段 + range 分区字段(如果存在)。

当指定 hash 分区时,LakeSoulTable 支持 upsert 操作 (scala/sql),此时 append 模式写数据被禁止,可以使用 LakeSoulTable.upsert() 方法或者 MERGE INTO SQL 语句。

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  // 使用 SQL 功能还需要增加以下两个配置项
  .config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
  .config("spark.sql.defaultCatalog", "lakesoul")
  .getOrCreate()
import spark.implicits._
val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
//create table
//spark batch
df.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  .save(tablePath)
//spark streaming
import org.apache.spark.sql.streaming.Trigger
val readStream = spark.readStream.parquet("inputPath")
val writeStream = readStream.writeStream
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("1 minutes"))
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum", "2")
  .option("checkpointLocation", "s3a://bucket-name/checkpoint/path")
  .start(tablePath)
writeStream.awaitTermination()
//对于已存在的表,写数据时不需要再指定分区信息
//相当于 insert overwrite partition,如果不指定 replaceWhere,则会重写整张表
df.write
  .mode("overwrite")
  .format("lakesoul")
  .option("replaceWhere","date='2021-01-01'")
  .save(tablePath)

Read LakeSoulTable可以通过 Spark read api 或者构建 LakeSoulTable 来读取数据,LakeSoul 也支持通过 Spark SQL 读取数据

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
//方法一
val df1 = spark.read.format("lakesoul").load(tablePath)
//方法二
val df2 = LakeSoulTable.forPath(tablePath).toDF

Upsert LakeSoulTable

Batch

当 LakeSoulTable 使用 hash 分区时,支持 upsert 功能。

默认情况下使用 MergeOnRead 模式,upsert 数据以 delta file 的形式写入表路径,LakeSoul 提供了高效的 upsert 和 merge scan 性能。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
import spark.implicits._
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
val extraDF = Seq(("2021-01-01",3,"chicken")).toDF("date","id","name")
lakeSoulTable.upsert(extraDF)

Streaming(见上文,流式读)

Update LakeSoulTableLakeSoul 支持 update 操作,通过指定条件和需要更新的字段 expression 来执行。有多种方式可以执行 update,详见 LakeSoulTable 注释。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
import org.apache.spark.sql.functions._
//update(condition, set)
lakeSoulTable.update(col("date") > "2021-01-01", Map("data" -> lit("2021-01-02")))

Delete Data

LakeSoul 支持 delete 操作删除符合条件的数据,条件可以是任意字段,若不指定条件,则会删除全表数据。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
//删除符合条件的数据
lakeSoulTable.delete("date='2021-01-01'")
//删除全表数据
lakeSoulTable.delete()

Compaction

执行 upsert 会生成 delta 文件,当 delta 文件过多时,会影响读取效率,此时可以执行 compaction 合并文件。

当执行全表 compaction 时,可以给 compaction 设置条件,只有符合条件的 range 分区才会执行 compaction 操作。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
//对指定分区执行 compaction 操作
lakeSoulTable.compaction("date='2021-01-01'")
//对全表所有分区执行 compaction 操作
lakeSoulTable.compaction()
//对全表所有分区执行 compaction 操作,会检测是否符合执行 compaction 的条件,只有符合条件的才会执行
lakeSoulTable.compaction(false)

Compaction 后挂载到 Hive Meta

自 2.0 版本起,LakeSoul 支持将 Compaction 后的目录路径,挂载到指定的 Hive 表,并保持所有 Range 分区名不变和自定义分区名功能。该功能可以方便下游一些只能支持访问 Hive 的系统读取到 LakeSoul 的数据。更推荐的方式是通过 Kyuubi 来支持 Hive JDBC,这样可以直接使用 Hive JDBC 调用 Spark 引擎来访问 LakeSoul 表,包括 Merge on Read 读取

要使用 LakeSoul 导出分区到 Hive Meta 的功能,保持 hive 分区名不变,可以执行如下 Compaction 调用:

import com.dmetasoul.lakesoul.tables.LakeSoulTable
val lakeSoulTable = LakeSoulTable.forName("lakesoul_test_table")
lakeSoulTable.compaction("date='2021-01-01'", "spark_catalog.default.hive_test_table")
# 自定义 hive 分区名,可以执行如下 Compaction 调用:
lakeSoulTable.compaction("date='2021-01-02'", "spark_catalog.default.hive_test_table", "date='20210102'")

注意 如果将 LakeSoul Catalog 设置为了 Spark 默认 Catalog,则 Hive 表名前面需要加上 spark_catalog

使用 Spark SQL 操作 LakeSoulTable

LakeSoul 支持 Spark SQL 读写数据,使用时需要设置 spark.sql.catalog.lakesoulorg.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog。同时也可以将 LakeSoul 设置为默认 Catalog,即增加 spark.sql.defaultCatalog=lakesoul 配置项。需要注意的是:

DDL SQL

# 创建主键表,需要通过 TBLPROPERTIES 设置主键名和哈希分桶数,没有设置则为非主键表
# 创建主键CDC表,需要增加表属性 `'lakesoul_cdc_change_column'='change_kind'`,具体请参考 [LakeSoul CDC 表](../Usage%20Doc/04-cdc-ingestion-table.mdx)
CREATE TABLE default.table_name (id string, date string, data string) USING lakesoul
    PARTITIONED BY (date)
    LOCATION 's3://bucket/table_path'
    TBLPROPERTIES(
      'hashPartitions'='id',
      'hashBucketNum'='2')

同时也支持使用 ALTER TABLE 增加或删除列,该部分与 Spark SQL 语法相同,暂不支持修改列的类型。

DML SQL

# INSERT INTO
insert overwrite/into table default.table_name partition (date='2021-01-01') select id from tmpView
# MERGE INTO
# 对主键表,可以通过 `Merge Into` 语句来实现 Upsert
# 暂不支持 Merge Into 中 MATCHED/NOT MATCHED 带条件的语句
# ON 子句只能包含主键相等的表达式,不支持非主键列连接,不支持非相等表达式
MERGE INTO default.`table_name` AS t USING source_table AS s
    ON t.hash = s.hash
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *

注意

  • 表名前可以添加 database(namespace) 名,默认为当前 USE 的 database 名,没有执行过 USE database 则为 default
  • 可以使用 LOCATION 子句或 path 表属性来设置表路径,如果没有设置路径,则默认为 spark.sql.warehouse.dir/database_name/table_name
  • 可以使用表路径来读写一个 LakeSoul 表,在 SQL 中表名部分需要写成 lakesoul.default.table_path

Operator on Hash Primary Keys

指定 hash 分区后,LakeSoul 各 range 分区内的数据根据 hash 主键字段分片且分片数据有序,因此部分算子作用于 hash 主键字段时,无需 shuffle 和 sort。

LakeSoul 目前支持 join、intersect 和 except 算子的优化,后续将支持更多算子。

1 Join on Hash Keys

支持的场景:

  • 对于同一张表,不同分区的数据根据 hash 字段进行 join 时,无需 shuffle 和 sort
  • 若两张不同表的 hash 字段类型和字段数量相同,且 hash bucket 数量相同,它们之间根据 hash 字段进行 join 时,也无需 shuffle 和 sort

2 Intersect/Except on Hash Keys

支持的场景:

  • 对同一张表不同分区的 hash 字段执行 intersect/except 时,无需 shuffle、sort 和 distinct
  • 对两张不同的表,若它们拥有相同的 hash 字段类型和字段数量且 hash bucket 数量相同,对 hash 字段执行 intersect/except 时,无需 shuffle、sort 和 distinct

range 分区内,hash 主键字段值是唯一的,因此 intersect 或 except 的结果是不重复的,后续操作不需要再次去重,例如可以直接 count 获取不重复数据的数量,无需 count distinct

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
  .config("spark.sql.defaultCatalog", "lakesoul")
  .getOrCreate()
import spark.implicits._
val df1 = Seq(("2021-01-01",1,1,"rice"),("2021-01-02",2,2,"bread")).toDF("date","id1","id2","name")
val df2 = Seq(("2021-01-01",1,1,2.7),("2021-01-02",2,2,1.3)).toDF("date","id3","id4","price")
val tablePath1 = "s3a://bucket-name/table/path/is/also/table/name/1"
val tablePath2 = "s3a://bucket-name/table/path/is/also/table/name/2"
df1.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id1,id2")
  .option("hashBucketNum","2")
  .save(tablePath1)
df2.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id3,id4")
  .option("hashBucketNum","2")
  .save(tablePath2)
//join on hash keys without shuffle and sort
//相同表的不同 range 分区
spark.sql(
  s"""
    |select t1.*,t2.* from
    | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1
    | join 
    | (select * from lakesoul.`$tablePath1` where date='2021-01-02') t2
    | on t1.id1=t2.id1 and t1.id2=t2.id2
  """.stripMargin)
    .show()
//相同 hash 设置的不同表
spark.sql(
  s"""
    |select t1.*,t2.* from
    | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1
    | join 
    | (select * from lakesoul.`$tablePath2` where date='2021-01-01') t2
    | on t1.id1=t2.id3 and t1.id2=t2.id4
  """.stripMargin)
  .show()
//intersect/except on hash keys without shuffle,sort and distinct
//相同表的不同 range 分区
spark.sql(
  s"""
    |select count(1) from 
    | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01'
    |  intersect
    | select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-02') t
  """.stripMargin)
  .show()
//相同 hash 设置的不同表
spark.sql(
  s"""
    |select count(1) from 
    | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01'
    |  intersect
    | select id3,id4 from lakesoul.`$tablePath2` where date='2021-01-01') t
  """.stripMargin)
  .show()

scheam演进

LakeSoul 支持 schema 演进功能,可以新增列 (分区字段无法修改)。新增列后,读取现有数据,该新增列会是 NULL。你可以通过使用 upsert 功能,为现有数据追加该新列。 

在写数据时指定 mergeSchematrue,或者启用 autoMerge 来 merge schema,新的 schema 为表原本 schema 和当前写入数据 schema 的并集。

df.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  //方式一
  .option("mergeSchema","true")
  .save(tablePath)
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  //方式二
  .config("spark.dmetasoul.lakesoul.schema.autoMerge.enabled", "true")
  .getOrCreate()

Drop Partition删除分区,也就是删除 range 分区,实际上并不会真正删掉数据文件,可以使用 cleanup 功能清理失效数据

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
//删除指定 range 分区
lakeSoulTable.dropPartition("date='2021-01-01'")

Drop Table删除表会直接删除表的所有 meta 数据和文件数据

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
//删除表
lakeSoulTable.dropTable()
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
1月前
|
SQL 存储 分布式计算
ODPS技术架构深度剖析与实战指南——从零开始掌握阿里巴巴大数据处理平台的核心要义与应用技巧
【10月更文挑战第9天】ODPS是阿里巴巴推出的大数据处理平台,支持海量数据的存储与计算,适用于数据仓库、数据挖掘等场景。其核心组件涵盖数据存储、计算引擎、任务调度、资源管理和用户界面,确保数据处理的稳定、安全与高效。通过创建项目、上传数据、编写SQL或MapReduce程序,用户可轻松完成复杂的数据处理任务。示例展示了如何使用ODPS SQL查询每个用户的最早登录时间。
90 1
|
7天前
|
SQL 数据采集 分布式计算
【赵渝强老师】基于大数据组件的平台架构
本文介绍了大数据平台的总体架构及各层的功能。大数据平台架构分为五层:数据源层、数据采集层、大数据平台层、数据仓库层和应用层。其中,大数据平台层为核心,负责数据的存储和计算,支持离线和实时数据处理。数据仓库层则基于大数据平台构建数据模型,应用层则利用这些模型实现具体的应用场景。文中还提供了Lambda和Kappa架构的视频讲解。
【赵渝强老师】基于大数据组件的平台架构
|
20天前
|
SQL 存储 数据挖掘
快速入门:利用AnalyticDB构建实时数据分析平台
【10月更文挑战第22天】在大数据时代,实时数据分析成为了企业和开发者们关注的焦点。传统的数据仓库和分析工具往往无法满足实时性要求,而AnalyticDB(ADB)作为阿里巴巴推出的一款实时数据仓库服务,凭借其强大的实时处理能力和易用性,成为了众多企业的首选。作为一名数据分析师,我将在本文中分享如何快速入门AnalyticDB,帮助初学者在短时间内掌握使用AnalyticDB进行简单数据分析的能力。
32 2
|
1月前
|
分布式计算 大数据 Serverless
云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
在2024云栖大会开源大数据专场上,阿里云宣布推出实时计算Flink产品的新一代向量化流计算引擎Flash,该引擎100%兼容Apache Flink标准,性能提升5-10倍,助力企业降本增效。此外,EMR Serverless Spark产品启动商业化,提供全托管Serverless服务,性能提升300%,并支持弹性伸缩与按量付费。七猫免费小说也分享了其在云上数据仓库治理的成功实践。其次 Flink Forward Asia 2024 将于11月在上海举行,欢迎报名参加。
176 1
云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
|
30天前
|
机器学习/深度学习 监控 搜索推荐
电商平台如何精准抓住你的心?揭秘大数据背后的神秘推荐系统!
【10月更文挑战第12天】在信息爆炸时代,数据驱动决策成为企业优化决策的关键方法。本文以某大型电商平台的商品推荐系统为例,介绍其通过收集用户行为数据,经过预处理、特征工程、模型选择与训练、评估优化及部署监控等步骤,实现个性化商品推荐,提升用户体验和销售额的过程。
73 1
|
1月前
|
人工智能 自然语言处理 关系型数据库
阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成
近日,阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成。
|
1月前
|
DataWorks 数据挖掘 关系型数据库
基于hologres搭建轻量OLAP分析平台解决方案评测
一文带你详细了解基于hologres搭建轻量OLAP分析平台解决方案的优与劣
182 8
|
2月前
|
数据可视化 数据挖掘 OLAP
基于 Hologres 搭建轻量 OLAP 分析平台评测报告
【9月更文第6天】开作为互联网手游公司的产品经理和项目经理,数据分析对于我们的业务至关重要。我们一直在寻找高效、可靠的数据分析解决方案,以更好地了解玩家行为、优化游戏体验和提升运营效率。近期,我们体验并部署了《基于 Hologres 搭建轻量 OLAP 分析平台》解决方案,以下是我们对该方案的评测报告。
84 12
基于 Hologres 搭建轻量 OLAP 分析平台评测报告
|
2月前
|
SQL 人工智能 DataWorks
【云栖实录】DataWorks:新一代智能湖仓一体数据开发与治理平台
在9月21日的云栖大会上,DataWorks发布了新一代智能湖仓一体数据开发与治理平台。DataWorks历经Kubernetes改造与云原生调度系统的优化,实现了资源组全面Serverless化,降低了使用成本,最高可节省40%。新推出的DataWorks Data Studio,支持多种计算引擎,提供更开放的云原生WebIDE,提升开发效率。DataWorks Copilot智能助手也得到升级,支持多种SQL方言和Python代码生成,平均提升数据开发效率35%。此外,DataWorks还推出了全方位的数据资产治理体系,涵盖业务和技术视角,助力企业实现数据智能化管理和转型。
331 0
【云栖实录】DataWorks:新一代智能湖仓一体数据开发与治理平台
|
3月前
|
搜索推荐 OLAP 流计算
OneSQL OLAP实践问题之基于 Flink 打造流批一体的数据计算平台如何解决
OneSQL OLAP实践问题之基于 Flink 打造流批一体的数据计算平台如何解决
55 1

热门文章

最新文章