PySpark|比RDD更快的DataFrame

简介: PySpark基础数据结构讲解

 DataFrame介绍

DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。如果你了解过pandas中的DataFrame,千万不要把二者混为一谈,二者从工作方式到内存缓存都是不同的。

DataFrame的作用

对于Spark来说,引入DataFrame之前,Python的查询速度普遍比使用RDD的Scala查询慢(Scala要慢两倍),通常情况下这种速度的差异来源于Python和JVM之间的通信开销。具体的时间差异如下图所示:

image.png

由上图可以看到,使用了DataFrame(DF)之后,Python的性能得到了很大的改进,对于SQL、R、Scala等语言的性能也会有很大的提升。

创建DataFrame

上一篇中我们了解了如何创建RDD,在创建DataFrame的时候,我们可以直接基于RDD进行转换。示例操作如下

    • spark.read.json()

    生成RDD:

    stringJSONRDD=sc.parallelize(("""   { "id": "123",    "name": "Katie",    "age": 19,    "eyeColor": "brown"  }""",
    """{    "id": "234",    "name": "Michael",    "age": 22,    "eyeColor": "green"  }""", 
    """{    "id": "345",    "name": "Simone",    "age": 23,    "eyeColor": "blue"  }""")
    )

    image.gif

    转换成DataFrame:

    swimmersJSON=spark.read.json(stringJSONRDD)

    image.gif

      • createOrReplaceTempView()

      我们可以使用该函数进行临时表的创建。

      swimmersJSON.createOrReplaceTempView("swimmersJSON")

      image.gif

      DataFrame查询

      我们可以使用DataFrame的API或者使用DataFrame的SQL查询。

        • show()

        使用show(n)方法,可以把前n行打印到控制台上(默认显示前十行)。

        swimmersJSON.show()

        image.gif

          • collect

          使用collect可以返回行对象列表的所有记录。

          swimmersJSON.collect()

          image.gif

            • SQL查询

            我们可以通过写SQL语句的形式对表格进行查询。

            spark.sql("select * from swimmersJSON").collect()

            image.gif

            DF和RDD的交互操作

              • printSchema()

              该方法可以用来打印出每个列的数据类型,我们称之为打印模式。

              swimmersJSON.printSchema()

              image.gif

                • StructType()

                该方法可以用于编程指定的模式。

                schema=StructType([
                StructField("id", LongType(), True),    
                StructField("name", StringType(), True),
                StructField("age", LongType(), True),
                StructField("eyeColor", StringType(), True)
                ])

                image.gif

                  • createDataFrame(XXRDD, schema)

                  该方法用于应用指定的schema模式并创建RDD。

                  swimmers=spark.createDataFrame(stringCSVRDD, schema)

                  image.gif

                  利用DataFrame API进行查询

                    • count()

                    用于得到DataFrame的行数。

                    swimmers.count()

                    image.gif

                      • 运行筛选语句

                      我们可以使用filter子句运行筛选语句,用select子句来指定要返回的列。

                      方法1:

                      swimmers.select("id", "age").filter("age = 22").show()

                      image.gif

                      方法2:

                      swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

                      image.gif

                      总结

                      image.png


                      相关文章
                      |
                      SQL 分布式计算 数据挖掘
                      197 Spark DataFrames概述
                      197 Spark DataFrames概述
                      83 0
                      |
                      SQL 分布式计算 HIVE
                      pyspark笔记(RDD,DataFrame和Spark SQL)1
                      pyspark笔记(RDD,DataFrame和Spark SQL)
                      124 1
                      |
                      SQL 分布式计算 数据挖掘
                      PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解
                      PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解
                      699 0
                      PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解
                      |
                      3月前
                      |
                      SQL 分布式计算 安全
                      |
                      6月前
                      |
                      SQL 分布式计算 安全
                      Spark的核心概念:RDD、DataFrame和Dataset
                      Spark的核心概念:RDD、DataFrame和Dataset
                      |
                      SQL 存储 分布式计算
                      pyspark笔记(RDD,DataFrame和Spark SQL)2
                      pyspark笔记(RDD,DataFrame和Spark SQL)
                      89 2
                      |
                      SQL 分布式计算 Shell
                      198 Spark DataFrames创建
                      198 Spark DataFrames创建
                      65 0
                      |
                      缓存 分布式计算 Spark
                      Spark RDD开发
                      开发步骤
                      73 0