流式DataFrame的操作

简介: 流式DataFrame的操作

一、实验目的

掌握流式DataFrame的选择、投影和聚合操作。

  掌握流式DataFrame的JOIN连接操作。

二、实验内容

1、读取json格式的文件数据源,并执行转换操作。

  2、连接两个DataFrame。

三、实验原理

使用流数据格式的DataFrame,可以将任何select和filter转换应用到它,以及应用任何在个别列上操作的Spark SQL函数。此外,基本聚合和高级分析函数也可用于流DataFrame。

 在流DataFrame中,不支持以下DataFrame转换,因为它们太过复杂,无法维护状态,或者由于流数据的无界性。

• 在流DataFrame上的多个聚合或聚合链。

• limit和take N行。

• distinct转换。但是,有一种方法可以使用唯一标识符来删除重复数据。

• 在没有任何聚合的情况下对流DataFrame进行排序。然而,在某种形式的聚合之后排序是得到支持的。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8.162,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

5.1 启动Spark集群

1、在终端窗口下,输入以下命令,分别启动Spark集群:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

2、在HDFS上面创建文件

1.  hdfs dfs -mkdir -p /data/dataset/streaming

3、启动spark-shell。在终端窗口下,执行如下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)

1.  $ spark-shell --master spark://localhost:7077

5.2 数据源说明

创建一个数据源文件,文件存放位置位于”/data/dataset/streaming/“目录下。在终端窗口下,执行以下命令:

1.  $  cd  /data/dataset/streaming/
2.  $  vi language.json
   文件内容如下:
1.  {"name":"python","age":12}
2.  {"name":"scala","age":23}
3.  {"name":"python","age":15}

5.3 结构化流DataFrame基本操作:投影和过滤

1、首先创建一个Schema。在spark-shell窗口下,分别执行如下代码:

1.  import org.apache.spark.sql.types._
2.       
3.  val personSchema = new StructType().add("name", StringType,false).add("age", IntegerType,false)

2、读取文件数据源。在spark-shell窗口下,分别执行如下代码:

1.  val data = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming/")

3、过滤和投影。选择查看数据中的name 和 age 字段,并且过滤 age 大于 12的人。在spark-shell窗口下,执行如下代码:

1.  val result1 = data.select($"name",$"age").where($"age" > 12)

4、将流查询结果输出到控制台,并启动流程序。在spark-shell窗口下,执行如下代码:

1.  val query1 = result1.writeStream.format("console").outputMode("append").start()

5、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:

1.  $ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/

6. 切换回spark-shell窗口,可以看到如下的输出结果:

7、结束流程序运行。在spark-shell窗口下,执行如下代码:

1.  query1.stop

5.4 结构化流DataFrame基本操作:数据聚合操作

1、删除HDFS目录下的文件

1.  $ hdfs dfs -rm /data/dataset/streaming/language.json

2、按name字段对数据进行分组,并分组对age字段进行求和。在spark-shell窗口下,执行如下代码:

1.  import org.apache.spark.sql.types._
2.       
3.  val personSchema = new StructType()add("name", StringType,false).add("age", IntegerType,false)
4.       
5.  val data = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming")
6.       
7.  val result2 = words.groupBy($"name").sum("age")
8.       
9.  val query2 = result2.writeStream.format("console").outputMode("complete").start()

3、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:

1.  $ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/

4. 切换回spark-shell窗口,可以看到如下的输出结果:

由上面结果可以看出对两条python的数据中的age值进行了求和。

  5、结束流程序运行。在spark-shell窗口下,执行如下代码:

1.  query2.stop

5.3 连接两个流DataFrame

可以用一个streaming DataFrame来做最酷的事情之一,就是join一个静态的DataFrame或者另一个streaming DataFrame。从Spark 2.3开始,结构化流支持join两个streaming DataFrames。

 1、删除HDFS目录下的文件

1.  $ hdfs dfs -rm /data/dataset/streaming/language.json

2、读取流数据。在spark-shell窗口下,执行如下代码:

1.  import org.apache.spark.sql.types._
2.       
3.  val personSchema = new StructType().add("name", StringType,false).add("age", IntegerType,false)
4.       
5.  val dataDF = spark.readStream.schema(personSchema).json("hdfs://localhost:9000/data/dataset/streaming")

3、通过join把两个DataFrame连接在一起(按name字段进行连接)。在spark-shell窗口下,执行如下代码:

1.  val df = dataDF.join(dataDF, "name")

4、输出连接后的DataFrame。在spark-shell窗口下,执行如下代码:

1.  val query = df.writeStream.format("console").outputMode("append").start()

5、将本地的/data/dataset/language.json文件上传到HDFS受流程序监视的目录中。另打开一个终端,在该终端窗口下,执行如下命令:

1.  $ hdfs dfs -put /data/dataset/streaming/language.json /data/dataset/streaming/

6. 切换回spark-shell窗口,可以看到如下的输出结果:

7、结束流程序运行。在spark-shell窗口下,执行如下代码:

1.  query.stop

— END —

相关文章
|
2月前
|
人工智能 程序员 数据处理
Pandas数据处理2、DataFrame的drop函数具体参数使用详情
Pandas数据处理2、DataFrame的drop函数具体参数使用详情
39 0
|
3月前
|
JSON 分布式计算 关系型数据库
Spark中使用DataFrame进行数据转换和操作
Spark中使用DataFrame进行数据转换和操作
|
7月前
|
存储 SQL 大数据
Pandas DataFrame 数据存储格式比较
Pandas 支持多种存储格式,在本文中将对不同类型存储格式下的Pandas Dataframe的读取速度、写入速度和大小的进行测试对比。
132 0
|
4月前
|
API 流计算
Flink(六)【DataFrame 转换算子(下)】
Flink(六)【DataFrame 转换算子(下)】
|
7月前
|
Python
dataframe操作查询
Pandas提供了多种查询方法,以下是一些常见的方法: 使用df.loc方法,根据行、列的标签值查询。 使用df.iloc方法,根据行、列的数字位置查询。 使用df.where方法,根据条件过滤数据。 使用df.query方法,根据字符串表达式查询数据。
452 0
|
7月前
|
SQL 数据处理 索引
pandas数据处理之合并与拼接
在许多应用中,数据可能来自不同的渠道,在数据处理的过程中常常需要将这些数据集进行组合合并拼接,形成更加丰富的数据集。pandas提供了多种方法完全可以满足数据处理的常用需求。具体来说包括有join、merge、concat、append等。
173 0
|
7月前
|
数据挖掘 索引 Python
pandas基本操作之数据访问(查看与检索)
对于数据分析来说,在构造或载入数据后最基本的操作应该就是对数据的访问了。看一看数据的结构、组成、分布等,根据需要从数据集中检索提取出相应的数据。pandas作为数据分析的利器,当然提供了多种查看和检索数据的方法。本文就来捋一捋pandas基本的数据访问。
161 0
|
10月前
|
数据处理
数据处理|数据框重铸
数据处理|数据框重铸
|
11月前
|
SQL 存储 分布式计算
DataFrame的操作-使用DSL
DataFrame的操作-使用DSL
|
12月前
|
机器学习/深度学习 存储 JSON
从Pandas快速切换到Polars :数据的ETL和查询
对于我们日常的数据清理、预处理和分析方面的大多数任务,Pandas已经绰绰有余。但是当数据量变得非常大时,它的性能开始下降。
228 0