开源湖仓一体平台(一):LakeSoul

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,20GB 3个月
简介: 开源湖仓一体平台(一):LakeSoul

LakeSoul 简介
Spark + Lakesoul CDC 入湖
LakeSoul Flink CDC 整库千表同步
数据更新 (Upsert) 和 Merge UDF 使用
多流合并构建宽表
Kafka 多 topic 数据入 LakeSoul

增删改查

 

lakeSoul简介

 

LakeSoul 是由数元灵科技研发的云原生湖仓一体框架,具备高可扩展的元数据管理、ACID 事务、高效灵活的 upsert 操作、Schema 演进和批流一体化处理等特性。

主要特性:

  • 弹性架构:计算存储完全分离,不需要固定节点和磁盘,计算存储各自弹性扩容。并且针对云存储做了大量优化,在对象存储上实现了并发一致性、增量更新等功能;使用 LakeSoul 不需要维护固定的存储节点,云上对象存储的成本只有本地磁盘的 1/10,极大地降低了存储成本和运维成本;
  • 高效可扩展的元数据管理:LakeSoul 使用 Postgres 数据库来管理文件元数据,可以高效的处理元数据的修改,并能够支持多并发写入,解决了 Hive 等元数据层的性能瓶颈,如长时间运行后元数据解析缓慢的痛点。元数据层的表结构经过精心设计,所有读写操作都能够使用主键索引,达到很高的 Ops。同时,元数据库在云上也能够很容易地进行扩容。
  • ACID 事务:通过元数据库事务机制实现了两阶段提交协议,保证了流批一体提交的事务性,用户不会看到不一致数据;支持多并发写入,自动冲突处理机制;
  • 多级分区模式和高效灵活的 upsert 操作:LakeSoul 支持 range 和 hash 分区,通过灵活的 upsert 功能,支持行、列级别的增、删、改等更新操作,将 upsert 数据以 delta file 的形式保存,大幅提高了写数据效率和并发性,而优化过的 merge scan 提供了高效的 MergeOnRead 读取性能;
  • 批流一体:LakeSoul 支持 streaming sink,可以同时处理流式数据摄入和历史数据批量回填、交互式查询等场景;
  • Schema 演进:支持新增、删除列,并在读取时自动兼容旧数据;
  • CDC 流、日志流自动同步:支持 MySQL 整库千表同步,自动建表和自动 Schema 变更;支持 Kafka 多 topic 合并同步、自动 Schema 解析、自动新 Topic 感知;
  • 云对象存储 IO 优化:使用 Rust Arrow 实现原生 Parquet IO,并对对象存储访问做了专门优化;


适用场景:

  • 构建实时湖仓,并且新增数据需要高效实时大批量写入,同时需要行、列级别的并发增量更新的场景;
  • 历史数据存储量很大,并且需要对大跨度时间范围做明细查询、修改,同时希望使用对象存储控制成本的场景;
  • 查询请求不固定,资源消耗变化较大,希望计算资源能够独立弹性伸缩的场景;
  • 需要多并发写,同时文件数量多,对元数据性能和并发有较高要求的场景;
  • 针对主键进行数据更新,对写吞吐有较高有求的场景;

 

spark + LakeSoul CDC入湖

 

使用 Spark Streaming,消费 Kafka 数据并同步更新至 LakeSoul

1:配置 LakeSoul 元数据库 

自 2.1.0 起,LakeSoul Spark 和 Flink 的 jar 包通过 shade 方式打包了 Postgres Driver,Driver 的名字是 com.lakesoul.shaded.org.postgresql.Driver,而在 2.0.1 版本之前,Driver 还没有 shaded 打包,名字是 org.postgresql.Driver

使用 LakeSoul 之前还需要初始化元数据表结构:

PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql

LakeSoul 使用 lakesoul_home (大小写均可) 环境变量或者 lakesoul_home JVM Property (只能全小写)来定位元数据库的配置文件,配置文件中主要包含 PostgreSQL DB 的连接信息。一个示例配置文件:

lakesoul.pg.driver=com.lakesoul.shaded.org.postgresql.Driver
lakesoul.pg.url=jdbc:postgresql://localhost:5432/lakesoul_test?stringtype=unspecified
lakesoul.pg.username=lakesoul_test
lakesoul.pg.password=lakesoul_test

如果找不到上述环境变量或 JVM Property,则会分别查找 LAKESOUL_PG_DRIVERLAKESOUL_PG_URLLAKESOUL_PG_USERNAMELAKESOUL_PG_PASSWORD 这几个环境变量作为配置的值。

2:设置 Spark 工程作业

LakeSoul 目前支持 Spark 3.1.2 + Scala 2.12

使用 spark-shellpyspark 或者 spark-sql 交互式查询, 需要添加 LakeSoul 的依赖和配置,有两种方法:

使用 --packages 传 Maven 仓库和包名

spark-shell --packages com.dmetasoul:lakesoul-spark:2.1.1-spark-3.1.2

使用打包好的 LakeSoul 包

可以从 Releases 页面下载已经打包好的 LakeSoul Jar 包。下载 jar 并传给 spark-submit 命令:

spark-submit --jars "lakesoul-spark-2.1.1-spark-3.1.2.jar"

将 Jar 包放在 Spark 环境中:将 Jar 包下载后,放在 $SPARK_HOME/jars 中

maven依赖

<dependency>
    <groupId>com.dmetasoul</groupId>
    <artifactId>lakesoul-spark</artifactId>
    <version>2.1.1-spark-3.1.2</version>
</dependency>

Spark 作业设置 lakesoul_home 环境变量

export lakesoul_home=/path/to/lakesoul.properties
  • 对于 Hadoop Yarn 集群, 增加命令行参数 --conf spark.yarn.appMasterEnv.lakesoul_home=lakesoul.properties --files /path/to/lakesoul.properties
  • 对于 K8s 集群,增加命令行参数 --conf spark.kubernetes.driverEnv.lakesoul_home=lakesoul.properties --files /path/to/lakesoul.properties to spark-submit command.

设置 Spark SQL Extension

LakeSoul 通过 Spark SQL Extension 机制来实现一些查询计划改写的扩展,需要为 Spark 作业增加以下配置:

spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension

设置 Spark 的 Catalog

LakeSoul 实现了 Spark 3 的 CatalogPlugin 接口,可以作为独立的 Catalog 插件让 Spark 加载。在 Spark 作业中增加如下配置:

spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog

该配置增加了一个名为 lakesoul 的 Catalog。为了方便 SQL 中使用,也可以将该 Catalog 设置为默认的 Catalog:

spark.sql.defaultCatalog=lakesoul

通过如上配置,默认会通过 LakeSoul Catalog 来查找所有 database 和表。如果需要同时访问 Hive 等外部 catalog,需要在表名前加上对应 catalog 名字。例如在 Spark 中启用 Hive 作为 Session Catalog,则访问 Hive 表时需要加上 spark_catalog 前缀。

spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog

从 2.1.0 起 LakeSoul 的 Catalog 更改为非 session 的实现。你仍然可以将 LakeSoul 设置为 Session Catalog,即设置名为 spark_catalog ,但是这样就无法再访问到 Hive 表。

自 2.0 版本起,LakeSoul 支持将 Compaction 后的目录路径,挂载到指定的 Hive 表,指定和 LakeSoul 分区名一致和自定义分区名两种功能。该功能可以方便下游一些只能支持访问 Hive 的系统读取到 LakeSoul 的数据。更推荐的方式是通过 Kyuubi 来支持 Hive JDBC,这样可以直接使用 Hive JDBC 调用 Spark 引擎来访问 LakeSoul 表,包括 Merge on Read 读取。

lakeSoulTable.compaction("date='2021-01-02'", "spark_catalog.default.hive_test_table", "date='20210102'")

3:启动SparkShell

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog

4:创建lakesoul表

我们创建一个 LakeSoul 表 MysqlCdcTest,这个表会准实时流批一体地同步 MySQL 的数据。这个表同样使用 id 列作为主键,用 "op" 列表示CDC的更新。并且我们需要通过 lakesoul_cdc_change_column 这个表属性,指定 LakeSoul 表中,表示 CDC 状态更新的列名,这个示例中该列的名字为 "op"。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val data=Seq((1L,1L,"hello world","insert")).toDF("id","rangeid","value","op")
LakeSoulTable.createTable(data, path).shortTableName("cdc").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "op").create()

5:启动 streaming 写入 LakeSoul读取 kafka,转换 Debezium 读取出的 json 格式,使用lakesoul upsert更新 LakeSoul 表:

import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val lakeSoulTable = LakeSoulTable.forPath(path)
var strList = List.empty[String]
//js1 是示例数据,我们这里也用于生成schema,在下文from_json函数中转换数据使用,before和after中内容对应于mysql表字段
val js1 = """{
        |  "before": {
        |    "id": 2,
        |    "rangeid": 2,
        |    "value": "sms"
        |  },
        |  "after": {
        |    "id": 2,
        |    "rangeid": 2,
        |    "value": "sms"
        |  },
        |  "source": {
        |    "version": "1.8.0.Final",
        |    "connector": "mysql",
        |    "name": "cdcserver",
        |    "ts_ms": 1644461444000,
        |    "snapshot": "false",
        |    "db": "cdc",
        |    "sequence": null,
        |    "table": "sms",
        |    "server_id": 529210004,
        |    "gtid": "de525a81-57f6-11ec-9b60-fa163e692542:1621099",
        |    "file": "binlog.000033",
        |    "pos": 54831329,
        |    "row": 0,
        |    "thread": null,
        |    "query": null
        |  },
        |  "op": "c",
        |  "ts_ms": 1644461444777,
        |  "transaction": null
        |}""".stripMargin
strList = strList :+ js1
val rddData = spark.sparkContext.parallelize(strList)
val resultDF = spark.read.json(rddData)
val sche = resultDF.schema
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
//对接kafka 需要指定kafka.bootstrap.server ip地址和debezium输出到kafka的topic
val kfdf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafkahost:9092")
.option("subscribe", "cdcserver.cdc.test")
.option("startingOffsets", "latest")
.load()
//解析debezium中产生的json,对于Mysql insert、update、delete操作会自动生成一列,列名op
val kfdfdata = kfdf
.selectExpr("CAST(value AS STRING) as value")
.withColumn("payload", from_json($"value", sche))
.filter("value is not null")
.drop("value")
.select("payload.after", "payload.before", "payload.op")
.withColumn(
    "op",
    when($"op" === "c", "insert")
    .when($"op" === "u", "update")
    .when($"op" === "d", "delete")
    .otherwise("unknown")
)
.withColumn(
    "data",
    when($"op" === "insert" || $"op" === "update", $"after")
    .when($"op" === "delete", $"before")
)
.drop($"after")
.drop($"before")
.select("data.*", "op")
//使用lakesoul upsert更新表中数据并在屏幕上输出解析后的数据
kfdfdata.writeStream
.foreachBatch { (batchDF: DataFrame, _: Long) =>
    {
    lakeSoulTable.upsert(batchDF)
    batchDF.show
    }
}
.start()
.awaitTermination()

flink cdc同步到LakeSoul

LakeSoul 自 2.1.0 版本起,实现了 Flink CDC Sink,能够支持 Table API 及 SQL (单表),以及 Stream API (整库多表)。目前支持的上游数据源为 MySQL(5.6-8.0)
1:下载 LakeSoul Flink Jar,lakesoul-flink-2.1.1-flink-1.14.jar

2:增加 LakeSoul 元数据库配置

$FLINK_HOME/conf/flink-conf.yaml 中增加如下配置:

containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.master.env.LAKESOUL_PG_USERNAME: root
containerized.master.env.LAKESOUL_PG_PASSWORD: root
containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://192.168.24.180:5432/test_lakesoul_meta?stringtype=unspecified
containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root
containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root
containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://192.168.24.180:5432/test_lakesoul_meta?stringtype=unspecified

注意这里 master 和 taskmanager 的环境变量都需要设置。

注意如果使用 Session 模式来启动作业,即将作业以 client 方式提交到 Flink Standalone Cluster,则 flink run 作为 client,是不会读取上面配置,因此需要再单独配置环境变量,即:

export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver
export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
export LAKESOUL_PG_USERNAME=root
export LAKESOUL_PG_PASSWORD=root

3:启动同步作业

bin/flink run -c org.apache.flink.lakesoul.entry.MysqlCdc \
    lakesoul-flink-2.1.1-flink-1.14.jar \
    --source_db.host localhost \
    --source_db.port 3306 \
    --source_db.db_name default \
    --source_db.user root \
    --source_db.password root \
    --source.parallelism 4 \
    --sink.parallelism 4 \
    --server_time_zone=Asia/Shanghai
    --warehouse_path s3://bucket/lakesoul/flink/data \
    --flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \
    --flink.savepoint s3://bucket/lakesoul/flink/savepoints

4: datastream代码

package org.apache.flink.lakesoul.entry;
import com.dmetasoul.lakesoul.meta.external.mysql.MysqlDBManager;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.lakesoul.sink.LakeSoulMultiTableSinkStreamBuilder;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.BinarySourceRecord;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashSet;
import java.util.List;
import static org.apache.flink.lakesoul.tool.JobOptions.*;
import static org.apache.flink.lakesoul.tool.LakeSoulDDLSinkOptions.*;
public class MysqlCdc2LakeSoul {
    public static void main(String[] args) throws Exception {
        ParameterTool parameter = ParameterTool.fromArgs(args);
        String dbName = parameter.get(SOURCE_DB_DB_NAME.key());
        String userName = parameter.get(SOURCE_DB_USER.key());
        String passWord = parameter.get(SOURCE_DB_PASSWORD.key());
        String host = parameter.get(SOURCE_DB_HOST.key());
        int port = parameter.getInt(SOURCE_DB_PORT.key(), MysqlDBManager.DEFAULT_MYSQL_PORT);
        String databasePrefixPath = parameter.get(WAREHOUSE_PATH.key());
        String serverTimezone = parameter.get(SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue());
        int sourceParallelism = parameter.getInt(SOURCE_PARALLELISM.key());
        int bucketParallelism = parameter.getInt(BUCKET_PARALLELISM.key());
        int checkpointInterval = parameter.getInt(JOB_CHECKPOINT_INTERVAL.key(),
                                                  JOB_CHECKPOINT_INTERVAL.defaultValue());     //mill second
        MysqlDBManager mysqlDBManager = new MysqlDBManager(dbName,
                                                           userName,
                                                           passWord,
                                                           host,
                                                           Integer.toString(port),
                                                           new HashSet<>(),
                                                           databasePrefixPath,
                                                           bucketParallelism,
                                                           true);
        mysqlDBManager.importOrSyncLakeSoulNamespace(dbName);
        //syncing mysql tables to lakesoul
        List<String> tableList = mysqlDBManager.listTables();
        if (tableList.isEmpty()) {
            throw new IllegalStateException("Failed to discover captured tables");
        }
        tableList.forEach(mysqlDBManager::importOrSyncLakeSoulTable);
        Configuration conf = new Configuration();
        // parameters for mutil tables ddl sink
        conf.set(SOURCE_DB_DB_NAME, dbName);
        conf.set(SOURCE_DB_USER, userName);
        conf.set(SOURCE_DB_PASSWORD, passWord);
        conf.set(SOURCE_DB_HOST, host);
        conf.set(SOURCE_DB_PORT, port);
        conf.set(WAREHOUSE_PATH, databasePrefixPath);
        conf.set(SERVER_TIME_ZONE, serverTimezone);
        // parameters for mutil tables dml sink
        conf.set(LakeSoulSinkOptions.USE_CDC, true);
        conf.set(LakeSoulSinkOptions.WAREHOUSE_PATH, databasePrefixPath);
        conf.set(LakeSoulSinkOptions.SOURCE_PARALLELISM, sourceParallelism);
        conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, bucketParallelism);
        StreamExecutionEnvironment env;
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool pt = ParameterTool.fromMap(conf.toMap());
        env.getConfig().setGlobalJobParameters(pt);
        env.enableCheckpointing(checkpointInterval);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(4023);
        CheckpointingMode checkpointingMode = CheckpointingMode.EXACTLY_ONCE;
        if (parameter.get(JOB_CHECKPOINT_MODE.key(), JOB_CHECKPOINT_MODE.defaultValue()).equals("AT_LEAST_ONCE")) {
            checkpointingMode = CheckpointingMode.AT_LEAST_ONCE;
        }
        env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setCheckpointStorage(parameter.get(FLINK_CHECKPOINT.key()));
        conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        MySqlSourceBuilder<BinarySourceRecord> sourceBuilder = MySqlSource.<BinarySourceRecord>builder()
                                                                        .hostname(host)
                                                                        .port(port)
                                                                        .databaseList(dbName) // set captured database
                                                                        .tableList(dbName + ".*") // set captured table
                                                                        .serverTimeZone(serverTimezone)  // default -- Asia/Shanghai
                                                                        .username(userName)
                                                                        .password(passWord);
        LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
        context.env = env;
        context.sourceBuilder = sourceBuilder;
        context.conf = conf;
        LakeSoulMultiTableSinkStreamBuilder builder = new LakeSoulMultiTableSinkStreamBuilder(context);
        DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource();
        Tuple2<DataStream<BinarySourceRecord>, DataStream<BinarySourceRecord>> streams =
                builder.buildCDCAndDDLStreamsFromSource(source);
        DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(streams.f0);
        DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
        DataStreamSink<BinarySourceRecord> ddlSink = builder.buildLakeSoulDDLSink(streams.f1);
        dmlSink.addSink(new LakeSoulMultiTablesSink())
        ddlSink.addSink(new LakeSoulDDLSink())
        env.execute("LakeSoul CDC Sink From MySQL Database " + dbName);
    }
}

5:LakeSoul Flink CDC Sink 严格一次语义保证

LakeSoul Flink CDC Sink 在作业运行过程中会自动保存相关状态,在 Flink 作业发生 Failover 时能够将状态恢复并重新写入,因此数据不会丢失。

LakeSoul 写入时,在两个部分保证写入的幂等性:

  • Stage 文件 Commit 时,与 Flink File Sink 一致,通过文件系统 rename 操作的原子性,来保证 staging 文件写入到最终的路径。因为 rename 是原子的,Failover 之后不会发生重复写入或缺失的情况。
  • LakeSoul 元数据提交时,会首先记录文件路径,在更新 snapshot 时会通过事务标记该文件已提交。Failover 后,通过判断一个文件是否已经提交,可以保证提交的幂等性。

综上,LakeSoul Flink CDC Sink 通过状态恢复保证数据不丢失,通过提交幂等性保证数据不重复,实现了严格一次(Exactly Once)语义保证。

 

数据更新和合并(upsert/merge)

LakeSoul可以支持对已经入湖的数据做部分字段更新功能,而不必将整张数据表全部覆盖重写,避免这种繁重且浪费资源的操作。

1:部分字段更新

举个例子一张表数据信息如下,id为主键(即hashPartitions),目前需要根据主键字段,对phone_number做字段修改处理。

可以使用 upsert 来实现对任意行中任意一个字段的更新。upsert需要包含主键 (id) 和需要修改的 address 信息,再次读取整张表数据 address 便可展示为修改后的字段信息。

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
import spark.implicits._
val df = Seq(("1", "Jake", "13700001111", "address_1", "job_1", "company_1"),("2", "Make", "13511110000", "address_2", "job_2", "company_2"))
  .toDF("id", "name", "phone_number", "address", "job", "company")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
df.write
  .mode("append")
  .format("lakesoul")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  .save(tablePath)
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
val extraDF = Seq(("1", "address_1_1")).toDF("id","address")
lakeSoulTable.upsert(extraDF)
lakeSoulTable.toDF.show()


2: 自定义Merge合并功能

LakeSoul 默认 merge 规则,即数据更新后取最后一条记录作为该字段数据 (org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.DefaultMergeOp)。在此基础上,LakeSoul 内置扩展了几种数据 merge 逻辑,对 Int/Long 字段做加和 merge(MergeOpInt/MergeOpLong)、对非空字段更新 (MergeNonNullOp)、以","拼接字符串 merge 方式。

下面以对非空字段更新 (MergeNonNullOp) 为例,借用上面表格数据样例。数据写入时同样以 upsert 方式进行更新写入,然后在数据读取时需要注册 merger 逻辑,然后进行读取即可。

import org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.MergeNonNullOp
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
import spark.implicits._
val df = Seq(("1", "Jake", "13700001111", "address_1", "job_1", "company_1"),("2", "Make", "13511110000", "address_2", "job_2", "company_2"))
  .toDF("id", "name", "phone_number", "address", "job", "company")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
df.write
  .mode("append")
  .format("lakesoul")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  .save(tablePath)
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
val extraDF = Seq(("1", "null", "13100001111", "address_1_1", "job_1_1", "company_1_1"),("2", "null", "13111110000", "address_2_2", "job_2_2", "company_2_2"))
  .toDF("id", "name", "phone_number", "address", "job", "company")
new MergeNonNullOp().register(spark, "NotNullOp")
lakeSoulTable.toDF.show()
lakeSoulTable.upsert(extraDF)
lakeSoulTable.toDF.withColumn("name", expr("NotNullOp(name)")).show()


用户也可以通过自定义 MergeOperator (实现 trait org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.MergeOperator) 来自定义 Merge 时的逻辑,能够灵活地实现数据高效入湖。

 

多流合并构建宽表

为构建宽表,传统数仓的 ETL 在做多表关联时,需要根据主外键多次 join,然后构建一个大宽表。当数据量较多或需要多次join时,会有效率低下,内存消耗大,容易 OOM 等问题,且 Shuffle 过程占据大部分数据交换时间,效率也很低下。LakeSoul 支持对数据进行 Upsert,并支持自定义 MergeOperator 功能,可以避免上述存在的问题,不必Join即可得到合并结果。下面针对这一场景具体举例进行说明。


假设有以下几个流的数据,A、B、C和D,各个流数据内容如下:

A:  

B:

 


C:

D:


最后需要形成一张大宽表,将四张表进行合并展示,如下:

传统意义上进行上述操作,需要将四张表根据主键(IP)进行三次join,写法如下:

Select 
       A.IP as IP,  
       A.sy as sy, 
       A.us as us, 
       B.free as free, 
       B.cache as cache, 
       C.level as level, 
       C.des as des, 
       D.qps as qps, 
       D.tps as tps 
from A join B on A.IP = B.IP 
    join C on C.IP = A.IP 
    join D on D.IP = A.IP.

LakeSoul 支持多流合并,多个流可以有不同的 Schema (需要有相同主键)。LakeSoul 可以做到自动扩展 Schema,若新写入的数据字段在原表中未存在,则会自动扩展表 schema,不存在的字段默认为null处理。通过使用 LakeSoul 多流合并功能,结合 LakeSoul 独特的 MergeOperator 功能,通过 upsert 将数据写入 LakeSoul 后,不需要 join,即可读取到拼接好的宽表。上述过程代码实现如下:

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .config("spark.dmetasoul.lakesoul.schema.autoMerge.enabled", "true")
  .getOrCreate()
import spark.implicits._
val df1 = Seq(("1.1.1.1", 30, 40)).toDF("IP", "sy", "us")
val df2 = Seq(("1.1.1.1", 1677, 455)).toDF("IP", "free", "cache")
val df3 = Seq(("1.1.1.2", "error", "killed")).toDF("IP", "level", "des")
val df4 = Seq(("1.1.1.1", 30, 40)).toDF("IP", "qps", "tps")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
df1.write
  .mode("append")
  .format("lakesoul")
  .option("hashPartitions","IP")
  .option("hashBucketNum","2")
  .save(tablePath)
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
lakeSoulTable.upsert(df2)
lakeSoulTable.upsert(df3)
lakeSoulTable.upsert(df4)
lakeSoulTable.toDF.show()


 

kafka多topic入LakeSoul

 

通过 LakeSul Kafka Stream 将 Kafka 中的数据同步到 LakeSul 非常方便。

LakeSoul Kafka Stream 可以支持自动创建表,自动识别新 topic,exactly-once 语义、自动为表添加分区等功能。

LakeSoul Kafka Stream 主要使用 Spark Structured Streaming 来实现数据同步功能。

使用 LakeSoul Kafka Stream需要以下条件之一:

       1:topic 中的数据为 json 格式;

       2:Kafka 集群带有 Schema Registry 服务

1. 准备环境

你可以编译 LakeSoul 项目以获取 LakeSoul Kafka Stream jar, 或者可以通过 https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-kafka-stream.tar.gz 来获取 LakeSoul Kafka Stream 以及其他任务运行依赖的jar包。

下载后解压 tar 包,然后将 jar 包放入 $SPARK_HOME/jars 目录下,或者在提交任务时添加依赖的jar,比如通过 --jars。

2. 启动 LakeSoul Kafka Stream 任务

  1. 任务启动时通过 lakesoul_home 环境变量添加元数据库信息. 这部分请参考 搭建本地测试环境
  2. 提交任务。你需要按顺序填写一些参数,以确保任务能够准确运行。参数描述如下:


3. 任务流程示例

1. 假设 Kafka 集群已经存在。在这里,通过 Docker Compose 运行 Kafka 集群。然后创建一个名为 "test" 的主题并向其中写入一些数据。Kafka bootstrap.servers: localhost:9092

# 创建 topic 'test'
bin# ./kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
# 查看 topic 列表
bin# ./kafka-topics.sh  --list --bootstrap-server localhost:9092
test
# 向 名为 'test' 的topic中写入一些数据
bin# ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>{"before":{"id":1,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":1}
>{"before":{"id":2,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":2}
>{"before":{"id":3,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":3}
>{"before":{"id":4,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":4}
>{"before":{"id":5,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":5}

2. 提交 Kafka Stream 任务. 将上述下载的依赖 jars 放到 $SPARK_HOME/jars 目录下.

export lakesoul_home=./pg.properties && ./bin/spark-submit \
--class org.apache.spark.sql.lakesoul.kafka.KafkaStream \
--driver-memory 4g \
--executor-memory 4g \
--master local[4] \
./jars/lakesoul-spark-2.1.1-spark-3.1.2-SNAPSHOT.jar \
localhost:9092 test.* /tmp/kafka/data /tmp/kafka/checkpoint/ kafka earliest false

3. 通过 spark-shell 查看写入 LakeSoul 中的数据

scala> import com.dmetasoul.lakesoul.tables.LakeSoulTable
import com.dmetasoul.lakesoul.tables.LakeSoulTable
scala> val tablepath="/tmp/kafka/data/kafka/test"
tablepath: String = /tmp/kafka/data/kafka/test
scala> val lake = LakeSoulTable.forPath(tablepath)
lake: com.dmetasoul.lakesoul.tables.LakeSoulTable = com.dmetasoul.lakesoul.tables.LakeSoulTable@585a2ad9
scala> lake.toDF.show(false)
+----------------------------------+----------------------------------+---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|after                             |before                            |op |source                                                                                                                                                                                                                                                                                                 |transaction|ts_ms        |
+----------------------------------+----------------------------------+---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|{"id":2,"rangeid":2,"value":"sms"}|{"id":1,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|1          |1644461444777|
|{"id":2,"rangeid":2,"value":"sms"}|{"id":3,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|3          |1644461444777|
|{"id":2,"rangeid":2,"value":"sms"}|{"id":5,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|5          |1644461444777|
|{"id":2,"rangeid":2,"value":"sms"}|{"id":2,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|2          |1644461444777|
|{"id":2,"rangeid":2,"value":"sms"}|{"id":4,"rangeid":2,"value":"sms"}|c  |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|4          |1644461444777|
+----------------------------------+----------------------------------+---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+

4. 创建新的 topic 'test_1' 并向其中写入一些数据

# 创建 topic 'test_1' 
bin# ./kafka-topics.sh --create --topic test_1 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 4
# 查看 topic list
bin# ./kafka-topics.sh  --list --bootstrap-server localhost:9092
test
test_1
# 向 topic 'test_1' 中写入一些数据
bin# ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_1
>{"before":{"id":1,"rangeid":2,"value":"sms"},"after":{"id":1,"rangeid":1,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":1}
>{"before":{"id":2,"rangeid":2,"value":"sms"},"after":{"id":2,"rangeid":2,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":2}
>{"before":{"id":3,"rangeid":2,"value":"sms"},"after":{"id":3,"rangeid":3,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":3}
>{"before":{"id":4,"rangeid":2,"value":"sms"},"after":{"id":4,"rangeid":4,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":4}
>{"before":{"id":5,"rangeid":2,"value":"sms"},"after":{"id":5,"rangeid":5,"value":"sms"},"source":{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1644461444777,"transaction":5}

5. 创建新的 topic 后,数据同步到 Lakesoul 需要5分钟时间。5分钟后查看 LakeSoul 中的数据

scala> val tablepath_1="/tmp/kafka/data/kafka/test_1"
tablepath_1: String = /tmp/kafka/data/kafka/test_1
scala> val lake_1 = LakeSoulTable.forPath(tablepath_1)
lake: com.dmetasoul.lakesoul.tables.LakeSoulTable = com.dmetasoul.lakesoul.tables.LakeSoulTable@43900101
lake_1.toDF.show(false)
+----------------------------------+----------------------------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|after                             |before                            |op  |source                                                                                                                                                                                                                                                                                                 |transaction|ts_ms        |
+----------------------------------+----------------------------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+
|{"id":2,"rangeid":2,"value":"sms"}|{"id":2,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|2          |1644461444777|
|{"id":1,"rangeid":1,"value":"sms"}|{"id":1,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|1          |1644461444777|
|{"id":4,"rangeid":4,"value":"sms"}|{"id":4,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|4          |1644461444777|
|{"id":3,"rangeid":3,"value":"sms"}|{"id":3,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|3          |1644461444777|
|{"id":5,"rangeid":5,"value":"sms"}|{"id":5,"rangeid":2,"value":"sms"}|c   |{"version":"1.8.0.Final","connector":"mysql","name":"cdcserver","ts_ms":1644461444000,"snapshot":"false","db":"cdc","sequence":null,"table":"sms","server_id":529210004,"gtid":"de525a81-57f6-11ec-9b60-fa163e692542:1621099","file":"binlog.000033","pos":54831329,"row":0,"thread":null,"query":null}|5          |1644461444777|
+----------------------------------+----------------------------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------+

6. 如果 Kafka 集群使用 Schema Registry服务,提交任务时参数最后需要填写 schema registry 服务地址。

export lakesoul_home=./pg.properties && ./bin/spark-submit \
--class org.apache.spark.sql.lakesoul.kafka.KafkaStream \
--driver-memory 4g \
--executor-memory 4g \
--master local[4] \
./jars/lakesoul-spark-2.1.1-spark-3.1.2-SNAPSHOT.jar \
localhost:9092 test.* /tmp/kafka/data /tmp/kafka/checkpoint/ kafka earliest false http://localhost:8081

增删改查

LakeSoul共支持四种commit操作:mergeCommit;appendCommit;compactCommit;updateCommit,对于update操作由于历史数据每次合并并生成新文件,难以获取增量文件,因此不支持增量查询。

1.分区信息
option(LakeSoulOptions.PARTITION_DESC, "range=range1")
如果未指定分区信息,则默认针对所有分区进行增量查询。
2.起始和结束时间戳
option(LakeSoulOptions.READ_START_TIME, "2022-01-01 15:15:15")
option(LakeSoulOptions.READ_END_TIME, "2022-01-01 20:15:15")
3.读类型
option(LakeSoulOptions.READ_TYPE, "incremental")
可以指定增量读"incremental",快照读"snapshot",不指定默认全量读。

增量读支持简单的upsert场景和CDC场景下的增量读,有两种方式,一种是通过调用LakeSoulTable.forPath()函数进行查询,另一种是通过spark.read指定选项进行增量读,可以获得指定分区在起止时间范围内的增量数据,获取的增量数据时间区间为前闭后开。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
// 针对给定range和时间点,进行增量读取,incremental表示增量读
// 读取range1分区在2023-01-01 15:15:15到2023-01-01 20:15:15时间范围内的增量数据
// 第一种方式,通过forPath进行增量读
val lake1 = LakeSoulTable.forPath(tablePath, "range=range1", "2023-01-01 15:15:15", "2023-01-01 20:15:15", "incremental")
val data1 = lake1.toDF.select("range", "hash", "op").toDF().show()
// 第二种方式,通过spark.read指定选项进行增量读
val lake2 = spark.read.format("lakesoul")
  .option(LakeSoulOptions.PARTITION_DESC, "range=range1")
  .option(LakeSoulOptions.READ_START_TIME, "2023-01-01 15:15:15")
  .option(LakeSoulOptions.READ_END_TIME, "2023-01-01 20:15:15")
  .option(LakeSoulOptions.READ_TYPE, "incremental")
  .load(tablePath)
val data2 = lake2.toDF.select("range", "hash", "op").toDF().show()

流式读LakeSoul支持 Spark Structured Streaming read,流式读基于增量查询,通过spark.readStream指定选项进行流式读,可以获得实时数据流中指定分区下每一批次更新的增量数据。指定的起始时间需要早于实时数据的摄入时间。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
// 通过spark.readStream指定选项进行流式读,读取range1分区在2023-01-01 15:15:15及之后的增量数据,每1秒触发一次读取,将结果输出到控制台
spark.readStream.format("lakesoul")
  .option(LakeSoulOptions.PARTITION_DESC, "range=range1")
  .option(LakeSoulOptions.READ_START_TIME, "2022-01-01 15:15:15")
  .load(tablePath)
  .writeStream.format("console")
  .trigger(Trigger.ProcessingTime(1000))
  .start()
  .awaitTermination()

创建和写入LakeSoulTable

1:Table Name

LakeSoul 中表名可以是一个路径,数据存储的目录就是 LakeSoulTable 的表名。同时一个表可以有一个表名帮助记忆,或在SQL中访问,即不是路径形式的一个字符串。

当调用 Dataframe.write.save 方法向 LakeSoulTable 写数据时,若表不存在,则会使用存储路径自动创建新表,但是默认没有表名,只能通过路径访问,可以通过添加 option("shortTableName", "table_name") 选项来设置表名。

通过 DataFrame.write.saveAsTable,会创建表,可以通过表名访问,路径默认为 spark.sql.warehouse.dir/current_database/table_name,后续可以通过路径或表名访问。如需自定义表路径,则可以加上 option("path", "s3://bucket/...") 选项。

通过 SQL 建表时,表名可以是路径或一个表名,路径必须是绝对路径。如果是表名,则路径的规则和上面 Dataframe.write.saveAsTable 一致,可以在 CREATE TABLE SQL 中通过 LOCATION 子句设置。

2:Partition

LakeSoulTable 有两种分区方式,分别是 range 分区和 hash 分区,可以两种分区同时使用。

  • range 分区即通常的基于时间的表分区,不同分区的数据文件存储在不同的分区路径下;
  • 使用 hash 分区,必须同时指定 hash 分区主键字段和 hash bucket num,在写数据时,会根据 bucket num 对 hash 主键字段值进行散列,取模后相同数据会写到同一个文件,文件内部根据 hash 字段值升序排列;
  • 若同时指定了 range 分区和 hash 分区,则每个 range 分区内,hash 值相同的数据会写到同一个文件里;
  • 指定分区后,写入 LakeSoulTable 的数据必须包含分区字段。

可以根据具体场景选择使用 range 分区或 hash 分区,或者同时使用两者。当指定 hash 分区后,LakeSoulTable 的数据将根据主键唯一,主键字段为 hash 分区字段 + range 分区字段(如果存在)。

当指定 hash 分区时,LakeSoulTable 支持 upsert 操作 (scala/sql),此时 append 模式写数据被禁止,可以使用 LakeSoulTable.upsert() 方法或者 MERGE INTO SQL 语句。

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  // 使用 SQL 功能还需要增加以下两个配置项
  .config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
  .config("spark.sql.defaultCatalog", "lakesoul")
  .getOrCreate()
import spark.implicits._
val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
//create table
//spark batch
df.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  .save(tablePath)
//spark streaming
import org.apache.spark.sql.streaming.Trigger
val readStream = spark.readStream.parquet("inputPath")
val writeStream = readStream.writeStream
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("1 minutes"))
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum", "2")
  .option("checkpointLocation", "s3a://bucket-name/checkpoint/path")
  .start(tablePath)
writeStream.awaitTermination()
//对于已存在的表,写数据时不需要再指定分区信息
//相当于 insert overwrite partition,如果不指定 replaceWhere,则会重写整张表
df.write
  .mode("overwrite")
  .format("lakesoul")
  .option("replaceWhere","date='2021-01-01'")
  .save(tablePath)

Read LakeSoulTable可以通过 Spark read api 或者构建 LakeSoulTable 来读取数据,LakeSoul 也支持通过 Spark SQL 读取数据

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
//方法一
val df1 = spark.read.format("lakesoul").load(tablePath)
//方法二
val df2 = LakeSoulTable.forPath(tablePath).toDF

Upsert LakeSoulTable

Batch

当 LakeSoulTable 使用 hash 分区时,支持 upsert 功能。

默认情况下使用 MergeOnRead 模式,upsert 数据以 delta file 的形式写入表路径,LakeSoul 提供了高效的 upsert 和 merge scan 性能。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
import spark.implicits._
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
val extraDF = Seq(("2021-01-01",3,"chicken")).toDF("date","id","name")
lakeSoulTable.upsert(extraDF)

Streaming(见上文,流式读)

Update LakeSoulTableLakeSoul 支持 update 操作,通过指定条件和需要更新的字段 expression 来执行。有多种方式可以执行 update,详见 LakeSoulTable 注释。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
import org.apache.spark.sql.functions._
//update(condition, set)
lakeSoulTable.update(col("date") > "2021-01-01", Map("data" -> lit("2021-01-02")))

Delete Data

LakeSoul 支持 delete 操作删除符合条件的数据,条件可以是任意字段,若不指定条件,则会删除全表数据。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
//删除符合条件的数据
lakeSoulTable.delete("date='2021-01-01'")
//删除全表数据
lakeSoulTable.delete()

Compaction

执行 upsert 会生成 delta 文件,当 delta 文件过多时,会影响读取效率,此时可以执行 compaction 合并文件。

当执行全表 compaction 时,可以给 compaction 设置条件,只有符合条件的 range 分区才会执行 compaction 操作。

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
//对指定分区执行 compaction 操作
lakeSoulTable.compaction("date='2021-01-01'")
//对全表所有分区执行 compaction 操作
lakeSoulTable.compaction()
//对全表所有分区执行 compaction 操作,会检测是否符合执行 compaction 的条件,只有符合条件的才会执行
lakeSoulTable.compaction(false)

Compaction 后挂载到 Hive Meta

自 2.0 版本起,LakeSoul 支持将 Compaction 后的目录路径,挂载到指定的 Hive 表,并保持所有 Range 分区名不变和自定义分区名功能。该功能可以方便下游一些只能支持访问 Hive 的系统读取到 LakeSoul 的数据。更推荐的方式是通过 Kyuubi 来支持 Hive JDBC,这样可以直接使用 Hive JDBC 调用 Spark 引擎来访问 LakeSoul 表,包括 Merge on Read 读取

要使用 LakeSoul 导出分区到 Hive Meta 的功能,保持 hive 分区名不变,可以执行如下 Compaction 调用:

import com.dmetasoul.lakesoul.tables.LakeSoulTable
val lakeSoulTable = LakeSoulTable.forName("lakesoul_test_table")
lakeSoulTable.compaction("date='2021-01-01'", "spark_catalog.default.hive_test_table")
# 自定义 hive 分区名,可以执行如下 Compaction 调用:
lakeSoulTable.compaction("date='2021-01-02'", "spark_catalog.default.hive_test_table", "date='20210102'")

注意 如果将 LakeSoul Catalog 设置为了 Spark 默认 Catalog,则 Hive 表名前面需要加上 spark_catalog

使用 Spark SQL 操作 LakeSoulTable

LakeSoul 支持 Spark SQL 读写数据,使用时需要设置 spark.sql.catalog.lakesoulorg.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog。同时也可以将 LakeSoul 设置为默认 Catalog,即增加 spark.sql.defaultCatalog=lakesoul 配置项。需要注意的是:

DDL SQL

# 创建主键表,需要通过 TBLPROPERTIES 设置主键名和哈希分桶数,没有设置则为非主键表
# 创建主键CDC表,需要增加表属性 `'lakesoul_cdc_change_column'='change_kind'`,具体请参考 [LakeSoul CDC 表](../Usage%20Doc/04-cdc-ingestion-table.mdx)
CREATE TABLE default.table_name (id string, date string, data string) USING lakesoul
    PARTITIONED BY (date)
    LOCATION 's3://bucket/table_path'
    TBLPROPERTIES(
      'hashPartitions'='id',
      'hashBucketNum'='2')

同时也支持使用 ALTER TABLE 增加或删除列,该部分与 Spark SQL 语法相同,暂不支持修改列的类型。

DML SQL

# INSERT INTO
insert overwrite/into table default.table_name partition (date='2021-01-01') select id from tmpView
# MERGE INTO
# 对主键表,可以通过 `Merge Into` 语句来实现 Upsert
# 暂不支持 Merge Into 中 MATCHED/NOT MATCHED 带条件的语句
# ON 子句只能包含主键相等的表达式,不支持非主键列连接,不支持非相等表达式
MERGE INTO default.`table_name` AS t USING source_table AS s
    ON t.hash = s.hash
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *

注意

  • 表名前可以添加 database(namespace) 名,默认为当前 USE 的 database 名,没有执行过 USE database 则为 default
  • 可以使用 LOCATION 子句或 path 表属性来设置表路径,如果没有设置路径,则默认为 spark.sql.warehouse.dir/database_name/table_name
  • 可以使用表路径来读写一个 LakeSoul 表,在 SQL 中表名部分需要写成 lakesoul.default.table_path

Operator on Hash Primary Keys

指定 hash 分区后,LakeSoul 各 range 分区内的数据根据 hash 主键字段分片且分片数据有序,因此部分算子作用于 hash 主键字段时,无需 shuffle 和 sort。

LakeSoul 目前支持 join、intersect 和 except 算子的优化,后续将支持更多算子。

1 Join on Hash Keys

支持的场景:

  • 对于同一张表,不同分区的数据根据 hash 字段进行 join 时,无需 shuffle 和 sort
  • 若两张不同表的 hash 字段类型和字段数量相同,且 hash bucket 数量相同,它们之间根据 hash 字段进行 join 时,也无需 shuffle 和 sort

2 Intersect/Except on Hash Keys

支持的场景:

  • 对同一张表不同分区的 hash 字段执行 intersect/except 时,无需 shuffle、sort 和 distinct
  • 对两张不同的表,若它们拥有相同的 hash 字段类型和字段数量且 hash bucket 数量相同,对 hash 字段执行 intersect/except 时,无需 shuffle、sort 和 distinct

range 分区内,hash 主键字段值是唯一的,因此 intersect 或 except 的结果是不重复的,后续操作不需要再次去重,例如可以直接 count 获取不重复数据的数量,无需 count distinct

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
  .config("spark.sql.defaultCatalog", "lakesoul")
  .getOrCreate()
import spark.implicits._
val df1 = Seq(("2021-01-01",1,1,"rice"),("2021-01-02",2,2,"bread")).toDF("date","id1","id2","name")
val df2 = Seq(("2021-01-01",1,1,2.7),("2021-01-02",2,2,1.3)).toDF("date","id3","id4","price")
val tablePath1 = "s3a://bucket-name/table/path/is/also/table/name/1"
val tablePath2 = "s3a://bucket-name/table/path/is/also/table/name/2"
df1.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id1,id2")
  .option("hashBucketNum","2")
  .save(tablePath1)
df2.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id3,id4")
  .option("hashBucketNum","2")
  .save(tablePath2)
//join on hash keys without shuffle and sort
//相同表的不同 range 分区
spark.sql(
  s"""
    |select t1.*,t2.* from
    | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1
    | join 
    | (select * from lakesoul.`$tablePath1` where date='2021-01-02') t2
    | on t1.id1=t2.id1 and t1.id2=t2.id2
  """.stripMargin)
    .show()
//相同 hash 设置的不同表
spark.sql(
  s"""
    |select t1.*,t2.* from
    | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1
    | join 
    | (select * from lakesoul.`$tablePath2` where date='2021-01-01') t2
    | on t1.id1=t2.id3 and t1.id2=t2.id4
  """.stripMargin)
  .show()
//intersect/except on hash keys without shuffle,sort and distinct
//相同表的不同 range 分区
spark.sql(
  s"""
    |select count(1) from 
    | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01'
    |  intersect
    | select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-02') t
  """.stripMargin)
  .show()
//相同 hash 设置的不同表
spark.sql(
  s"""
    |select count(1) from 
    | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01'
    |  intersect
    | select id3,id4 from lakesoul.`$tablePath2` where date='2021-01-01') t
  """.stripMargin)
  .show()

scheam演进

LakeSoul 支持 schema 演进功能,可以新增列 (分区字段无法修改)。新增列后,读取现有数据,该新增列会是 NULL。你可以通过使用 upsert 功能,为现有数据追加该新列。 

在写数据时指定 mergeSchematrue,或者启用 autoMerge 来 merge schema,新的 schema 为表原本 schema 和当前写入数据 schema 的并集。

df.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  //方式一
  .option("mergeSchema","true")
  .save(tablePath)
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  //方式二
  .config("spark.dmetasoul.lakesoul.schema.autoMerge.enabled", "true")
  .getOrCreate()

Drop Partition删除分区,也就是删除 range 分区,实际上并不会真正删掉数据文件,可以使用 cleanup 功能清理失效数据

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
//删除指定 range 分区
lakeSoulTable.dropPartition("date='2021-01-01'")

Drop Table删除表会直接删除表的所有 meta 数据和文件数据

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
//删除表
lakeSoulTable.dropTable()
相关实践学习
数据库实验室挑战任务-初级任务
本场景介绍如何开通属于你的免费云数据库,在RDS-MySQL中完成对学生成绩的详情查询,执行指定类型SQL。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
4月前
|
存储 SQL 分布式计算
开源大数据比对平台设计与实践—dataCompare
开源大数据比对平台设计与实践—dataCompare
72 0
|
4月前
|
SQL 存储 大数据
某互联网大厂亿级大数据服务平台的建设和实践
某互联网大厂亿级大数据服务平台的建设和实践
69 0
|
4月前
|
SQL 大数据 关系型数据库
开源大数据比对平台(dataCompare)新版本发布
开源大数据比对平台(dataCompare)新版本发布
75 0
|
4月前
|
SQL 存储 分布式计算
从0到1介绍一下开源大数据比对平台dataCompare
从0到1介绍一下开源大数据比对平台dataCompare
128 0
|
2月前
|
监控 物联网 大数据
智慧工地管理平台系统源码基于物联网、云计算、大数据等技术
智慧工地平台APP通过对施工过程人机料法环的全面感知、互联互通、智能协同,提高施工现场的生产效率、管理水平和决策能力,实现施工管理的数字化、智能化、精益化。
57 0
|
3月前
|
SQL 分布式计算 HIVE
开源湖仓一体平台(二):Arctic(上篇)
开源湖仓一体平台(二):Arctic(上篇)
开源湖仓一体平台(二):Arctic(上篇)
|
3月前
|
存储 人工智能 运维
轻喜到家基于 EMR-StarRocks 构建实时湖仓分析平台实践
本文从轻喜到家的历史技术架构与痛点问题、架构升级需求与 OLAP 选型过程、最新技术架构及落地场景应用等方面,详细介绍了轻喜到家基于 EMR-StarRocks 构建实时湖仓分析平台实践经验。
904 0
轻喜到家基于 EMR-StarRocks 构建实时湖仓分析平台实践
|
4月前
|
SQL 存储 大数据
从0到1介绍一下开源大数据服务平台dataService
从0到1介绍一下开源大数据服务平台dataService
116 1
|
4月前
|
存储 SQL 数据挖掘
某工商信息商业查询平台基于阿里云数据库 SelectDB 版内核 Apache Doris 的湖仓一体建设实践
从传统 Lambda 架构到基于 Doris Multi-Catalog 的湖仓一体架构实践,保证了数据的准确性和实时性、高效处理和分析了大规模数据,推动信息服务行业发展创新!
某工商信息商业查询平台基于阿里云数据库 SelectDB 版内核 Apache Doris 的湖仓一体建设实践
|
2月前
|
分布式计算 DataWorks IDE
MaxCompute数据问题之忽略脏数据如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
47 0

热门文章

最新文章