使用Apache Arrow助力PySpark数据处理

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: Apache Arrow从Spark 2.3版本开始被引入,通过列式存储,zero copy等技术,JVM 与Python 之间的数据传输效率得到了大量的提升。本文主要介绍一下Apache Arrow以及Spark中的使用方法。

Apache Arrow从Spark 2.3版本开始被引入,通过列式存储,zero copy等技术,JVM 与Python 之间的数据传输效率得到了大量的提升。本文主要介绍一下Apache Arrow以及Spark中的使用方法。

列式存储简介

在介绍Spark中使用Apache Arrow之前,先简单的介绍一下Apache Arrow以及他背后的一些技术背景。
在大数据时代之前,大部分的存储引擎使用的是按行存储的形式,很多早期的系统,如交易系统、ERP系统等每次处理的是增、删、改、查某一个实体的所有信息,按行存储的话能够快速的定位到单个实体并进行处理。如果使用列存储,对某一个实体的不同属性的操作就需要进行多次随机读写,效率将会是非常差的。
随着大数据时代的到来,尤其是数据分析的不断发展,任务不需要一次读取实体的所有属性,而只关心特定的某些属性,并对这些属性进行aggregate等复杂的操作等。这种情况下行存储将需要读取额外的数据,形成瓶颈。而选择列存储将会减少额外数据的读取,对相同属性的数据还可以进行压缩,大大的加快了处理速度。
以下是行存储和列存储的对比说明,摘自Apache Arrow 官网,上面是一个二维表,由三个属性组成,分别是session_id, timestamp和source_ip。左侧为行存储在内存中的表示,数据按行依次存储,每一行按照列的顺序存储。右侧为列存储在内存中的表示,每一列单独存放,根据batch size等属性来控制一次写入的列簇大小。这样当查询语句只涉及少数列的时候,比如图中SQL查询,只需要过滤session_id列,避免读取所有数据列,减少了大量的I/O损耗,同时考虑到CPU pipeline以及使用CPU SIMD技术等等,将大大的提升查询速度。
image

Apache Arrow

在大数据领域,列式存储的灵感来自Google于2010年发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能,在Dremel论文中还介绍了Google如何使用这种存储格式实现并行查询的。这篇论文影响了Hadoop生态系统发展,之后的Apache Parquet和Apache ORC作为列式存储格式已经被广大的Hadoop生态系统使用,如Spark、Hive、Impala等等。
Apache Arrow在官网上是这样定义的,Apache Arrow是一个跨语言、跨平台的内存数据结构。从这个定义中我们可以看到Apache Arrow与Apache Parquet以及Apache ORC的区别。Parquet与ORC设计的目的针对磁盘数据,在列存储的基础上使用了高效率的压缩算法进行压缩,比如使用snappy、gzip和zlib等算法对列数据进行压缩。所以大部分情况下在数据读取的时候需要首先对数据进行反压缩,并有一定的cpu使用损耗。而Arrow,作为在内存中的数据,并不支持压缩(当然写入磁盘是支持压缩的),Arrow使用dictionary-encoded来进行类似索引的操作。
除了列存储外,Arrow在数据在跨语言的数据传输上具有相当大的威力,Arrow的跨语言特性表示在Arrow的规范中,作者指定了不同数据类型的layout,包括不同原始数据类型在内存中占的比特数,Array数据的组成以及Null值的表示等等。根据这些定义后,在不同的平台和不同的语言中使用Arrow将会采用完全相同的内存结构,因此在不同平台间和不同语言间进行高效数据传输成为了可能。在Arrow之前如果要对不同语言数据进行传输必须要使用序列化与反序列化技术来完成,耗费了大量的CPU资源和时间,而Arrow由于根据规范在内存中的数据结构一致,可以通过共享内存, 内存映射文件等技术来共享Arrow内存结构,省去了序列化与反序列过程。

Spark与Apache Arrow

介绍完Arrow的背景后,来看一下Apache Spark如何使用Arrow来加速PySpark处理的。一直以来,使用PySpark的客户都在抱怨python的效率太低,导致了很多用户转向了使用Scala进行开发。这主要是由于Spark使用Scala语言开发,底层启动的是JVM,而PySpark是Scala中PythonRDD对象启动的一个Python子进程,Python与JVM的通信使用了Py4J, 通过Py4J Python程序能够动态的访问JVM中的Java对象,这一过程使用了linux pipe,在底层JVM需要对RDD进行序列化,在Python端需要对RDD进行反序列化,当数据量较大的时候效率远不如直接使用Scala。流程如下图。
image_pyspark
很多数据科学家以及分析人员习惯使用python来进行处理,尤其是使用Pandas和Numpy库来对数据进行后续处理,Spark 2.3以后引入的Arrow将会大大的提升这一效率。我们从代码角度来看一下实现,在Spark 2.4版本的dataframe.py代码中,toPandas的实现为:

          if use_arrow:
                try:
                    from pyspark.sql.types import _check_dataframe_convert_date, \
                        _check_dataframe_localize_timestamps
                    import pyarrow
                    batches = self._collectAsArrow()
                    if len(batches) > 0:
                        table = pyarrow.Table.from_batches(batches)
                        pdf = table.to_pandas()
                        pdf = _check_dataframe_convert_date(pdf, self.schema)
                        return _check_dataframe_localize_timestamps(pdf, timezone)
                    else:
                        return pd.DataFrame.from_records([], columns=self.columns)
                except Exception as e:
                    # We might have to allow fallback here as well but multiple Spark jobs can
                    # be executed. So, simply fail in this case for now.
                    msg = (
                        "toPandas attempted Arrow optimization because "
                        "'spark.sql.execution.arrow.enabled' is set to true, but has reached "
                        "the error below and can not continue. Note that "
                        "'spark.sql.execution.arrow.fallback.enabled' does not have an effect "
                        "on failures in the middle of computation.\n  %s" % _exception_message(e))
                    warnings.warn(msg)
                    raise

如果使用了Arrow(Spark 2.4默认使用),比较重要的一行是_collectAsArrow(),_collectAsArrow()实现为:

    def _collectAsArrow(self):
        """
        Returns all records as a list of ArrowRecordBatches, pyarrow must be installed
        and available on driver and worker Python environments.

        .. note:: Experimental.
        """
        with SCCallSiteSync(self._sc) as css:
            sock_info = self._jdf.collectAsArrowToPython()
        return list(_load_from_socket(sock_info, ArrowStreamSerializer()))

这里面使用了ArrowStreamSerializer(),而ArrowStreamSerializer定义为

class ArrowStreamSerializer(Serializer):
    """
    Serializes Arrow record batches as a stream.
    """

    def dump_stream(self, iterator, stream):
        import pyarrow as pa
        writer = None
        try:
            for batch in iterator:
                if writer is None:
                    writer = pa.RecordBatchStreamWriter(stream, batch.schema)
                writer.write_batch(batch)
        finally:
            if writer is not None:
                writer.close()

    def load_stream(self, stream):
        import pyarrow as pa
        reader = pa.open_stream(stream)
        for batch in reader:
            yield batch

    def __repr__(self):
        return "ArrowStreamSerializer"

可以看出在这里面,jvm对数据根据Arrow规范设置好内存数据结构进行列式转化后,Python层面并不需要任何的反序列过程,而是直接读取,这也是Arrow高效的原因之一。
对比看那一下如果不使用Arrow方法为:

 def collect(self):
        """Returns all the records as a list of :class:`Row`.

        >>> df.collect()
        [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
        """
        with SCCallSiteSync(self._sc) as css:
            sock_info = self._jdf.collectToPython()
        return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))

序列化方法为PickleSerializer,需要对每一条数据使用PickleSerializer进行反序列化。

那如何通过这一特性来进行我们的开发呢,Spark提供了Pandas UDFs功能,即向量化UDF,Pandas UDF主要是通过Arrow将JVM里面的Spark DataFrame传输给Python生成pandas DataFrame,并执行用于定义的UDF。目前有两种类型,一种是Scalar,一种是Grouped Map。
这里主要介绍一下Scalar Python UDFs的使用,以及可能的场景。 Scalar Python UDFs可以在select和withColumn中使用,他的输入参数为pandas.Series类型,输出参数为相同长度的pandas.Series。Spark内部会通过Arrow将列式数据根据batch size获取后,批量的将数据转化为pandas.Series类型,并在每个batch都执行用户定义的function。最后将不同batch的结果进行整合,获取最后的数据结果。
以下是官网的一个例子:

import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a, b):
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

首先定义udf,multiply_func,主要功能就是将a、b两列的数据对应行数据相乘获取结果。然后通过pandas_udf装饰器生成Pandas UDF。最后使用df.selecct方法调用Pandas UDF获取结果。这里面要注意的是pandas_udf的输入输出数据是向量化数据,包含了多行,可以根据spark.sql.execution.arrow.maxRecordsPerBatch来设置。
可以看出Pandas UDF使用非常简单,只需要定义好Pandas UDF就可以了。有了Pandas UDF后我们可以很容易的将深度学习框架和Spark进行结合,比如在UDF中使用一些深度学习框架,比如scikit-learn,我们可以对批量的数据分别进行训练。下面是一个简单的例子,利用Pandas UDF来进行训练:

# Load necessary libraries
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
import pandas as pd
from scipy.optimize import leastsq
import numpy as np

# Create the schema for the resulting data frame
schema = StructType([StructField('ID', LongType(), True),
                     StructField('p0', DoubleType(), True),
                     StructField('p1', DoubleType(), True)])
# Define the UDF, input and outputs are Pandas DFs
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def analyze_player(sample_pd):
    # return empty params in not enough data
    if (len(sample_pd.shots) <= 1):
        return pd.DataFrame({'ID': [sample_pd.player_id[0]], 
                                   'p0': [ 0 ], 'p1': [ 0 ]})
     
    # Perform curve fitting     
    result = leastsq(fit, [1, 0], args=(sample_pd.shots, 
                                  sample_pd.hits))
    # Return the parameters as a Pandas DF 
    return pd.DataFrame({'ID': [sample_pd.player_id[0]], 
                       'p0': [result[0][0]], 'p1': [result[0][1]]})
# perform the UDF and show the results 
player_df = df.groupby('player_id').apply(analyze_player)
display(player_df)

除此之外还可以使用TensorFlow和MXNet等与Spark进行融合,近期阿里云EMR Data Science集群将会推出相应的功能,整合EMR Spark与深度学习框架之间调度与数据交换功能,希望大家关注。

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
8月前
|
分布式计算 大数据 数据处理
Apache Spark:提升大规模数据处理效率的秘籍
【4月更文挑战第7天】本文介绍了Apache Spark的大数据处理优势和核心特性,包括内存计算、RDD、一站式解决方案。分享了Spark实战技巧,如选择部署模式、优化作业执行流程、管理内存与磁盘、Spark SQL优化及监控调优工具的使用。通过这些秘籍,可以提升大规模数据处理效率,发挥Spark在实际项目中的潜力。
597 0
|
8月前
|
SQL Java 数据库连接
Apache Doris 支持 Arrow Flight SQL 协议,数据传输效率实现百倍飞跃
近年来,随着数据科学、数据湖分析等场景的兴起,对数据读取和传输速度提出更高的要求。而 JDBC/ODBC 作为与数据库交互的主流标准,在应对大规模数据读取和传输时显得力不从心,无法满足高性能、低延迟等数据处理需求。为提供更高效的数据传输方案,Apache Doris 在 2.1 版本中基于 Arrow Flight SQL 协议实现了高速数据传输链路,使得数据传输性能实现百倍飞跃。
|
5月前
|
消息中间件 Java Kafka
|
5月前
|
消息中间件 传感器 数据处理
"揭秘实时流式计算:低延迟、高吞吐量的数据处理新纪元,Apache Flink示例带你领略实时数据处理的魅力"
【8月更文挑战第10天】实时流式计算即时处理数据流,低延迟捕获、处理并输出数据,适用于金融分析等需即时响应场景。其框架(如Apache Flink)含数据源、处理逻辑及输出目标三部分。例如,Flink可从数据流读取信息,转换后输出。此技术优势包括低延迟、高吞吐量、强容错性及处理逻辑的灵活性。
109 4
|
5月前
|
存储 运维 数据处理
Apache Paimon:重塑阿里智能引擎数据处理新纪元,解锁高效存储与实时分析潜能!
【8月更文挑战第2天】探索 Apache Paimon 在阿里智能引擎的应用场景
229 2
|
5月前
|
机器学习/深度学习 分布式计算 数据处理
|
7月前
|
消息中间件 Kafka 数据处理
Apache Flink:流式数据处理的强大引擎
【6月更文挑战第8天】Apache Flink是开源的流处理框架,专注于高效、低延迟的无界和有界数据流处理。它提供统一编程模型,支持实时与批量数据。核心概念包括DataStreams、DataSets、时间语义和窗口操作。使用Flink涉及环境设置、数据源配置(如Kafka)、数据转换(如map、filter)、窗口聚合及数据输出。通过丰富API和灵活时间语义,Flink适于构建复杂流处理应用,在实时数据处理领域具有广阔前景。
|
8月前
|
存储 分布式计算 Apache
✨[hadoop3.x]新一代的存储格式Apache Arrow(四)
✨[hadoop3.x]新一代的存储格式Apache Arrow(四)
113 1
|
分布式计算 Hadoop Java
Hadoop生态系统中的流式数据处理技术:Apache Flink和Apache Spark的比较
Hadoop生态系统中的流式数据处理技术:Apache Flink和Apache Spark的比较
|
消息中间件 存储 分布式计算
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用
Hadoop生态系统中的实时数据处理技术:Apache Kafka和Apache Storm的应用

推荐镜像

更多