使用 Jupyter Notebook 运行 Delta Lake 入门教程

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 因为官方教程是基于商业软件 Databricks Community Edition 构建,虽然教程中使用的软件特性都是开源 Delta Lake 版本所具备的,但是考虑到国内的网络环境,注册和使用 Databricks Community Edition 门槛较高。所以本文尝试基于开源的 Jupiter Notebook 重新构建这个教程。

作者:吴威,花名无谓,阿里巴巴高级技术专家,2008年加入阿里巴巴集团,先后在B2B和阿里云工作,一直从事大数据和分布式计算相关研究,作为主要开发和运维。人员经历了阿里内部大数据集群的上线和发展壮大,现在阿里云EMR团队,负责Spark、Hadoop等计算引擎研发。


本文的例子来自 Delta Lake 官方教程。因为官方教程是基于商业软件 Databricks Community Edition 构建,虽然教程中使用的软件特性都是开源 Delta Lake 版本所具备的,但是考虑到国内的网络环境,注册和使用 Databricks Community Edition 门槛较高。所以本文尝试基于开源的 Jupiter Notebook 重新构建这个教程。

准备一个环境安装 Spark 和 jupyter

本文基于 Linux 构建开发环境,同时使用的软件比如 conda、jupyter以及 pyspark 等都可以在 Windows 和 MacOS 上找到,理论上来说也完全可以在这两个系统上完成此教程。

假设系统已经安装 anaconda 或 miniconda,我们使用 conda 来构建开发环境,可以非常方便的安装 pyspark 和 jupyter notebook

conda create --name spark
conda activate spark

conda install pyspark 
conda install -c conda-forge jupyterlab

环境变量设置

我们在设置一些环境变量之后,就可以使用 pyspark 命令来创建 jupyter notebook 服务

export SPARK_HOME=$HOME/miniconda3/envs/spark/lib/python3.7/site-packages/pyspark

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

启动服务(注意这里的参数里指定了 Delta Lake 的 package,Spark 会帮忙自动下载依赖):

pyspark --packages io.delta:delta-core_2.11:0.5.0

接下去所有代码在 notebook 里运行

下载需要 parquet 文件

%%bash
rm -fr /tmp/delta_demo

mkdir -p /tmp/delta_demo/loans/ 

wget -O /tmp/delta_demo/loans/SAISEU19-loan-risks.snappy.parquet https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet 
ls -al /tmp/delta_demo/loans/

Delta Lake的批流处理

在这里我们进入正题,开始介绍 Delta Lake 的批流处理能力。

首先,我们通过批处理的形式创建一张 Delta Lake 表,数据来自前面我们下载的 parquet 文件,可以和方便的把一张 parquet 表转换为 Delta Lake 表:

import os
import shutil
from pyspark.sql.functions import * 

delta_path = "/tmp/delta_demo/loans_delta"

# Delete a new delta table with the parquet file
if os.path.exists(delta_path):
    print("Deleting path " + delta_path)
    shutil.rmtree(delta_path)

# Create a new delta table with the parquet file
spark.read.format("parquet").load("/tmp/delta_demo/loans") \
  .write.format("delta").save(delta_path)
print("Created a Delta table at " + delta_path)

我来查一下这张表,数据量是否正确:

# Create a view on the table called loans_delta
spark.read.format("delta").load(delta_path).createOrReplaceTempView("loans_delta")
print("Defined view 'loans_delta'")

spark.sql("select count(*) from loans_delta").show()

Defined view 'loans_delta'
+--------+
|count(1)|
+--------+
|   14705|
+--------+

接下去我们会使用Spark Streaming流式写入这张 Delta Lake 表,同时展示 Delta Lake 的 Schema enforcement 能力(本文省略了流式写 Parquet 表的演示部分,那部分指出了 parquet 文件的不足,比如无法强制指定 Schema )

import random
from pyspark.sql.functions import *
from pyspark.sql.types import *

def random_checkpoint_dir(): 
    return "/tmp/delta_demo/chkpt/%s" % str(random.randint(0, 10000))

# User-defined function to generate random state

states = ["CA", "TX", "NY", "IA"]

@udf(returnType=StringType())
def random_state():
    return str(random.choice(states))

# Generate a stream of randomly generated load data and append to the delta table
def generate_and_append_data_stream_fixed(table_format, table_path):
    
    stream_data = spark.readStream.format("rate").option("rowsPerSecond", 50).load() \
        .withColumn("loan_id", 10000 + col("value")) \
        .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
        .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
        .withColumn("addr_state", random_state()) \
        .select("loan_id", "funded_amnt", "paid_amnt", "addr_state")   # *********** FIXED THE SCHEMA OF THE GENERATED DATA *************

    query = stream_data.writeStream \
        .format(table_format) \
        .option("checkpointLocation", random_checkpoint_dir()) \
        .trigger(processingTime="10 seconds") \
        .start(table_path)
    
    return query

启动两个流式作业:

stream_query_1 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)
stream_query_2 = generate_and_append_data_stream_fixed(table_format = "delta", table_path = delta_path)

因为 Delta Lake 的乐观锁机制,多个流可以同时写入一张表,并保证数据的完整性。

通过批处理的方式来查询一下当前表中的数据量,我们发现有数据被插入了:

spark.sql("select count(*) from loans_delta").show()
+--------+
|count(1)|
+--------+
|   17605|
+--------+

接下去我们停止所有流的写入,接下去会展示 Delta Lake 的其他特性

# Function to stop all streaming queries 
def stop_all_streams():
    # Stop all the streams
    print("Stopping all streams")
    for s in spark.streams.active:
        s.stop()
    print("Stopped all streams")
    print("Deleting checkpoints")  
    shutil.rmtree("/tmp/delta_demo/chkpt/", True)
    print("Deleted checkpoints")

stop_all_streams()
Schema evolution(Schema演化)

Delta Lake 支持Schema演化,也就是说我们可以增加或改变表字段。接下去的批处理 SQL 会新增加一些数据,同时这些数据比之前的多了一个“closed”字段。我们将新的 DF 配置参数 mergeSchema 为 true 来显示指明 Delta Lake 表 Schema 的演化:

cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']

items = [
  (1111111, 1000, 1000.0, 'TX', True), 
  (2222222, 2000, 0.0, 'CA', False)
]

loan_updates = spark.createDataFrame(items, cols) \
  .withColumn("funded_amnt", col("funded_amnt").cast("int"))
  
loan_updates.write.format("delta") \
  .mode("append") \
  .option("mergeSchema", "true") \
  .save(delta_path)

来看一下插入新数据之后的表内容,新增加了 closed 字段,之前的老数据行这个字段默认为 null。

spark.read.format("delta").load(delta_path).show()
+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|      0|       1000|   182.22|        CA|  null|
|      1|       1000|   361.19|        WA|  null|
|      2|       1000|   176.26|        TX|  null|
|      3|       1000|   1000.0|        OK|  null|
|      4|       1000|   249.98|        PA|  null|
|      5|       1000|    408.6|        CA|  null|
|      6|       1000|   1000.0|        MD|  null|
|      7|       1000|   168.81|        OH|  null|
|      8|       1000|   193.64|        TX|  null|
|      9|       1000|   218.83|        CT|  null|
|     10|       1000|   322.37|        NJ|  null|
|     11|       1000|   400.61|        NY|  null|
|     12|       1000|   1000.0|        FL|  null|
|     13|       1000|   165.88|        NJ|  null|
|     14|       1000|    190.6|        TX|  null|
|     15|       1000|   1000.0|        OH|  null|
|     16|       1000|   213.72|        MI|  null|
|     17|       1000|   188.89|        MI|  null|
|     18|       1000|   237.41|        CA|  null|
|     19|       1000|   203.85|        CA|  null|
+-------+-----------+---------+----------+------+
only showing top 20 rows

新的数据行具有 closed 字段:

spark.read.format("delta").load(delta_path).filter(col("closed") == True).show()
+-------+-----------+---------+----------+------+
|loan_id|funded_amnt|paid_amnt|addr_state|closed|
+-------+-----------+---------+----------+------+
|1111111|       1000|   1000.0|        TX|  true|
+-------+-----------+---------+----------+------+

Delta Lake 表的删除操作

除了常规的插入操作,Delta Lake 还支持 update 和 delete 等功能,可以更新表格内容。下面展示删除操作,我们希望删除表格中贷款已经被完全还清的记录。下面几条命令可以简单和清晰的展示删除过程。

首先,我们看看符合条件的记录有多少条:

spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()

+--------+
|count(1)|
+--------+
|    5134|
+--------+

然后,我们执行一个 delete 命令:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, delta_path)
deltaTable.delete("funded_amnt = paid_amnt")

最后,我们看一下删除后的结果,发现符合条件的记录都已被删除:

spark.sql("SELECT COUNT(*) FROM loans_delta WHERE funded_amnt = paid_amnt").show()
+--------+
|count(1)|
+--------+
|       0|
+--------+

版本历史和回溯

Delta Lake 还具有很强大历史版本记录和回溯功能。history()方法清晰的展示了刚才那张表的修改记录,包括最后一次 Delete 操作。

deltaTable.history().show()

+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|       operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+
|     10|2020-02-22 22:14:06|  null|    null|          DELETE|[predicate -> ["(...|null|    null|     null|          9|          null|        false|
|      9|2020-02-22 22:13:57|  null|    null|           WRITE|[mode -> Append, ...|null|    null|     null|          8|          null|         true|
|      8|2020-02-22 22:13:52|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          6|          null|         true|
|      7|2020-02-22 22:13:50|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          6|          null|         true|
|      6|2020-02-22 22:13:42|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          4|          null|         true|
|      5|2020-02-22 22:13:40|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          4|          null|         true|
|      4|2020-02-22 22:13:32|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          2|          null|         true|
|      3|2020-02-22 22:13:30|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          2|          null|         true|
|      2|2020-02-22 22:13:22|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          1|          null|         true|
|      1|2020-02-22 22:13:20|  null|    null|STREAMING UPDATE|[outputMode -> Ap...|null|    null|     null|          0|          null|         true|
|      0|2020-02-22 22:13:18|  null|    null|           WRITE|[mode -> ErrorIfE...|null|    null|     null|       null|          null|         true|
+-------+-------------------+------+--------+----------------+--------------------+----+--------+---------+-----------+--------------+-------------+

如果我们希望看一下刚才删除操作前的数据表状态,可以很方便的回溯到前一个快照点,并进行再次查询(我们可以看到被删除的记录又出现了)。

previousVersion = deltaTable.history(1).select("version").collect()[0][0] - 1

spark.read.format("delta") \
  .option("versionAsOf", previousVersion) \
  .load(delta_path) \
  .createOrReplaceTempView("loans_delta_pre_delete") \

spark.sql("SELECT COUNT(*) FROM loans_delta_pre_delete WHERE funded_amnt = paid_amnt").show()

+--------+
|count(1)|
+--------+
|    5134|
+--------+

结论

本文通过 jupyter notebook 工具演示了 Delta Lake 的官方教程,你可以在原文链接末尾下载到完整的 notebook 文件。


阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,定期推送精彩案例,技术专家直播,问答区近万人Spark技术同学在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!image.png
对开源大数据和感兴趣的同学可以加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
image.png

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
5月前
|
机器学习/深度学习 安全 数据挖掘
安全地运行 Jupyter 服务
【8月更文第29天】Jupyter Notebook 是一种流行的交互式计算环境,广泛应用于数据分析、机器学习等领域。然而,随着 Jupyter 服务越来越多地被部署在网络环境中,安全问题变得日益重要。本文将介绍一些最佳实践,帮助您保护 Jupyter 服务器免受攻击和数据泄露的风险。
150 0
|
7月前
|
数据采集 机器学习/深度学习 数据可视化
使用Jupyter Notebook进行数据分析:入门与实践
【6月更文挑战第5天】Jupyter Notebook是数据科学家青睐的交互式计算环境,用于创建包含代码、方程、可视化和文本的文档。本文介绍了其基本用法和安装配置,通过一个数据分析案例展示了如何使用Notebook进行数据加载、清洗、预处理、探索、可视化以及建模。Notebook支持多种语言,提供直观的交互体验,便于结果呈现和分享。它是高效数据分析的得力工具,初学者可通过本文案例开始探索。
|
5月前
|
Python
Jupyter Notebook又一利器nbterm,在终端玩notebook!
Jupyter Notebook又一利器nbterm,在终端玩notebook!
|
5月前
|
Python
PyCharm中运行jupyter
PyCharm中运行jupyter
108 0
|
7月前
|
文字识别 异构计算 Python
关于云端Jupyter Notebook的使用过程与感想
在自学Python时,由于家庭电脑使用冲突和设备老旧,转向云端平台。体验了多个服务:1. 魔搭modelscope(最喜欢,赠送资源丰富,社区活跃),2. Colaboratory(免费GPU,但有时重启,建议用阿里云),3. Deepnote(免费环境有限,但GPT-4代码生成功能强大),4. 飞桨aistudio(适合PaddlePaddle用户),5. ModelArts(曾有免费实例,现难找)。综合来看,阿里云的稳定性与服务更优,尤其是魔搭的自动代码修正功能。对于AIGC,推荐魔搭和付费版PAI-DSW。欢迎分享更多云端Jupyter平台体验。
360 1
|
7月前
|
Python 数据挖掘 数据可视化
Python数据分析——Pandas与Jupyter Notebook
【6月更文挑战第1天】 本文探讨了如何使用Python的Pandas库和Jupyter Notebook进行数据分析。首先,介绍了安装和设置步骤,然后展示了如何使用Pandas的DataFrame进行数据加载、清洗和基本分析。接着,通过Jupyter Notebook的交互式环境,演示了数据分析和可视化,包括直方图的创建。文章还涉及数据清洗,如处理缺失值,并展示了如何进行高级数据分析,如数据分组和聚合。此外,还提供了将分析结果导出到文件的方法。通过销售数据的完整案例,详细说明了从加载数据到可视化和结果导出的全过程。最后,讨论了进一步的分析和可视化技巧,如销售额趋势、产品销售排名和区域分布,以及
279 2
|
8月前
|
Linux 数据安全/隐私保护
anaconda运行Notebook和jupyter报错resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard)) ValueError
anaconda运行Notebook和jupyter报错resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard)) ValueError
67 0
|
8月前
|
JSON 数据可视化 数据挖掘
适合数据分析的ide---Jupyter Notebook的安装使用
适合数据分析的ide---Jupyter Notebook的安装使用
129 2
|
8月前
|
Ubuntu 网络安全 数据安全/隐私保护
使用SSH隧道将Ubuntu云服务器Jupyter Notebook端口映射到本地
这样,你就成功地将Ubuntu云服务器上的Jupyter Notebook端口映射到本地,使你能够通过本地浏览器访问并使用Jupyter Notebook。
511 1
|
8月前
|
自然语言处理 数据可视化 数据挖掘
Python 的科学计算和数据分析: 解释什么是 Jupyter Notebook?
Python科学计算与数据分析中,借助`numpy`进行数值计算,`matplotlib`用于绘图。Jupyter Notebook提供交互式编程环境,支持多语言,集成各种可视化工具。其优势在于结合代码、结果和文本,提升工作效率,具备自动补全、语法高亮等特性。示例展示了导入库,生成随机数据并用`matplotlib`画正弦波图的过程。Jupyter Notebook虽便捷,但复杂任务可能需结合`scipy`、`pandas`等更多库。
89 4