大数据Spark Structured Streaming 2

简介: 大数据Spark Structured Streaming

2.3 编程模型

Structured Streaming将流式数据当成一个不断增长的table,然后使用和批处理同一套API,都是基于DataSet/DataFrame的。如下图所示,通过将流式数据理解成一张不断增长的表,从而就可以像操作批的静态数据一样来操作流数据了。

在这个模型中,主要存在下面几个组成部分:

e785b6f0db8545c88f48f3b9c4701944.png

第一部分:Input Table(Unbounded Table),流式数据的抽象表示,没有限制边界的,表的

数据源源不断增加;

第二部分:Query(查询),对 Input Table 的增量式查询,只要Input Table中有数据,立即(默认情况)执行查询分析操作,然后进行输出(类似SparkStreaming中微批处理);

第三部分:Result Table,Query 产生的结果表;056159a6b88441659af32cf3b7a6844f.png

第四部分:Output,Result Table 的输出,依据设置的输出模式OutputMode输出结果;

Structured Streaming最核心的思想就是将实时到达的数据看作是一个不断追加的unboundtable无界表,到达流的每个数据项就像是表中的一个新行被附加到无边界的表中,用静态结构化数据的批处理查询方式进行流计算。

b5c5559834a042099eaf24eabd7e9850.png

以词频统计WordCount案例,Structured Streaming实时处理数据的示意图如下,各行含义:

第一行、表示从TCP Socket不断接收数据,使用【nc -lk 9999】;

第二行、表示时间轴,每隔1秒进行一次数据处理;

第三行、可以看成是“input unbound table",当有新数据到达时追加到表中;

第四行、最终的wordCounts是结果表,新数据到达后触发查询Query,输出的结果;

第五行、当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;该示例设置为CompleteMode,因此每次都将所有数据输出到控制台;cb08ac014a2043b0acb00a05ec802823.png

上图中数据实时处理说明:

第一、在第1秒时,此时到达的数据为"cat dog"和"dog dog",因此可以得到第1秒时的结果集cat=1dog=3,并输出到控制台;

第二、当第2秒时,到达的数据为"owl cat",此时"unbound table"增加了一行数据"owl cat",执

行word count查询并更新结果集,可得第2秒时的结果集为cat=2 dog=3 owl=1,并输出到控制台;

第三、当第3秒时,到达的数据为"dog"和"owl",此时"unbound table"增加两行数据"dog"和

“owl”,执行word count查询并更新结果集,可得第3秒时的结果集为cat=2 dog=4 owl=2;使用Structured Streaming处理实时数据时,会负责将新到达的数据与历史数据进行整合,并完成正确的计算操作,同时更新Result Table。

3 入门案例:WordCount

入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。

文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example

3.1 功能演示

运行词频统计WordCount程序,从TCP Socket消费数据,官方演示说明截图如下:274fc76b003c4d2cba0b7e328502dcf2.png

演示运行案例步骤:


第一步、打开终端Terminal,运行NetCat,命令为:nc -lk 9999

第二步、打开另一个终端Terminal,执行如下命令

# 官方入门案例运行:词频统计
/export/server/spark/bin/run-example \
--master local[2] \
--conf spark.sql.shuffle.partitions=2 \
org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount \
node1.oldlu.cn 9999
# 测试数据
spark hadoop spark hadoop spark hive
spark spark spark
spark hadoop hive

发送数据以后,最终统计输出结果如下:

3761b21e811541f5807cda843886b14b.png

3.2 Socket 数据源

从Socket中读取UTF8文本数据。一般用于测试,使用nc -lk 端口号向Socket监听的端口发送数据,用于测试使用,有两个参数:


参数一:host,主机名称,必须指定参数

参数二:port,端口号,必须指定参数

范例如下所示:

b7d6efb2e75b4130b803d1dcf8eb9085.png

3.3 Console 接收器

将结果数据打印到控制台或者标准输出,通常用于测试或Bedug使用,三种输出模式OutputMode(Append、Update、Complete)都支持,两个参数可设置:


参数一:numRows,打印多少条数据,默认为20条;

参数二:truncate,如果某列值字符串太长是否截取,默认为true,截取字符串;

范例如下所示:

1a5ca1d652fb4455b86c18874cc54bcf.png

3.4 编程实现

可以认为Structured Streaming = SparkStreaming + SparkSQL,对流式数据处理使用SparkSQL数据结构,应用入口为SparkSession,对比SparkSQL与SparkStreaming编程:


Spark Streaming:将流式数据按照时间间隔(BatchInterval)划分为很多Batch,每批次数据封装在RDD中,底层RDD数据,构建StreamingContext实时消费数据;

Structured Streaming属于SparkSQL模块中一部分,对流式数据处理,构建SparkSession对象,

指定读取Stream数据和保存Streamn数据,具体语法格式:

加载数据Load:读取静态数据【spark.read】、读取流式数据【spark.readStream】

保存数据Save:保存静态数据【ds/df.write】、保存流式数据【ds/df.writeStrem】词频统计案例: 从TCP Socket实消费流式数据,进行词频统计,将结果打印在控制台Console 。

第一点、程序入口SparkSession,加载流式数据:spark.readStream
第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
第三点、启动流式应用,设置Output结果相关信息、start方法启动应用

完整案例代码如下:

import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.{DataFrame,SparkSession}
/**
 * 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
 */
        object StructuredWordCount{
        def main(args:Array[String]):Unit={
// TODO: 构建SparkSession实例对象
        val spark:SparkSession=SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
        .config("spark.sql.shuffle.partitions","2") // 设置Shuffle分区数目
        .getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 1. 从TCP Socket 读取数据
val inputStreamDF:DataFrame=spark.readStream
        .format("socket")
        .option("host","node1.oldlu.cn").option("port",9999)
        .load()
/*
root
|-- value: string (nullable = true)
*/
//inputStreamDF.printSchema()
// TODO: 2. 业务分析:词频统计WordCount
        val resultStreamDF:DataFrame=inputStreamDF
        .as[String] // 将DataFrame转换为Dataset进行操作
// 过滤数据
        .filter(line=>null!=line&&line.trim.length>0)
// 分割单词
        .flatMap(line=>line.trim.split("\\s+"))
        .groupBy($"value").count() // 按照单词分组,聚合
/*
root
|-- value: string (nullable = true)
|-- count: long (nullable = false)
*/
//resultStreamDF.printSchema()
// TODO: 3. 设置Streaming应用输出及启动
        val query:StreamingQuery=resultStreamDF.writeStream
// TODO: 设置输出模式:Complete表示将ResultTable中所有结果数据输出
// .outputMode(OutputMode.Complete())
// TODO: 设置输出模式:Update表示将ResultTable中有更新结果数据输出
        .outputMode(OutputMode.Update())
        .format("console")
        .option("numRows","10").option("truncate","false")
// 流式应用,需要启动start
        .start()
// 流式查询等待流式应用终止
        query.awaitTermination()
// 等待所有任务运行完成才停止运行
        query.stop()
        }
     }

其中可以设置不同输出模式(OutputMode),当设置为Complete时,结果表ResultTable中

所有数据都输出;当设置为Update时,仅仅输出结果表ResultTable中更新的数据。


4 DataStreamReader 接口

从Spark 2.0至Spark 2.4版本,目前支持数据源有4种,其中Kafka 数据源使用作为广泛,其他数据源主要用于开发测试程序。

文档: http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#input-sources


c21dd86533e044ef971ec2d8c75cf17c.png

在Structured Streaming中使用SparkSession#readStream读取流式数据,返回DataStreamReader对象,指定读取数据源相关信息,声明如下:

40d809c0e8d54a29a089c6c0466cc27b.png

查看DataStreamReader中方法可以发现与DataFrameReader中基本一致,编码上更加方便加载流式数据。

993e75892cc9410ebf2f2dfa950f727b.png


5 文件数据源

将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet,可以设置相关可选参数:

79f94d7d41834190999973445cb44622.png

从文件数据源加载数据伪代码如下:


val streamDF = spark
        .readStream
// Schema must be specified when creating a streaming source DataFrame.
        .schema(schema)
// 每个trigger最大文件数量
        .option("maxFilesPerTrigger",100)
// 是否首先计算最新的文件,默认为false
        .option("latestFirst",value = true)
// 是否只检查名字,如果名字相同,则不视为更新,默认为false
        .option("fileNameOnly",value = true)
        .csv("*.csv")

演示范例:监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。


测试数据

jack;23;running
charles;32;basketball
tom;28;football
lili;24;running
bob;20;swimming
zhangsan;32;running
lisi;28;running
wangwu;24;running
zhaoliu;26;swimming
honghong;28;running`

业务实现代码,监控Windows系统目录【D:/datas】

import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.types.{IntegerType,StringType,StructType}
import org.apache.spark.sql.{DataFrame,Dataset,Row,SparkSession}
/**
 * 使用Structured Streaming从目录中读取文件数据:统计年龄小于25岁的人群的爱好排行榜
 */
        object StructuredFileSource{
        def main(args:Array[String]):Unit={
// 构建SparkSession实例对象
        val spark:SparkSession=SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
// 设置Shuffle分区数目
        .config("spark.sql.shuffle.partitions","2")
        .getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 从文件系统,监控目录,读取CSV格式数据
// 数据样本 -> jack,23,running
val csvSchema:StructType=new StructType()
        .add("name",StringType,nullable=true)
        .add("age",IntegerType,nullable=true)
        .add("hobby",StringType,nullable=true)
        val inputStreamDF:DataFrame=spark.readStream
        .option("sep",";")
        .option("header","false")
// 指定schema信息
        .schema(csvSchema)
        .csv("file:///D:/datas/")
// 依据业务需求,分析数据:统计年龄小于25岁的人群的爱好排行榜
        val resultStreamDF:Dataset[Row]=inputStreamDF
// 年龄小于25岁
        .filter($"age"< 25)
// 按照爱好分组统计
        .groupBy($"hobby").count()
// 按照词频降序排序
        .orderBy($"count".desc)
// 设置Streaming应用输出及启动
        val query:StreamingQuery=resultStreamDF.writeStream
// 对流式应用输出来说,设置输出模式
        .outputMode(OutputMode.Complete())
        .format("console")
        .option("numRows","10")
        .option("truncate","false")
// 流式应用,需要启动start
        .start()
// 查询器等待流式应用终止
        query.awaitTermination()
        query.stop() // 等待所有任务运行完成才停止运行
        }
   }

6 Rate source

以每秒指定的行数生成数据,每个输出行包含2个字段:timestamp和value。其中timestamp是一个Timestamp含有信息分配的时间类型,并且value是Long(包含消息的计数从0开始作为第一行)类型。此源用于测试和基准测试,可选参数如下:

f6de17ebbe2642c58c600e89bbc12a48.png

演示范例代码如下:

import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery,Trigger}
import org.apache.spark.sql.{DataFrame,SparkSession}
/**
 * 数据源:Rate Source,以每秒指定的行数生成数据,每个输出行包含一个timestamp和value。
 */
        object StructuredRateSource{
        def main(args:Array[String]):Unit={
// 构建SparkSession实例对象
        val spark:SparkSession=SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
// 设置Shuffle分区数目
        .config("spark.sql.shuffle.partitions","2")
        .getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO:从Rate数据源实时消费数据
val rateStreamDF:DataFrame=spark.readStream
        .format("rate")
        .option("rowsPerSecond","10") // 每秒生成数据数目
        .option("rampUpTime","0s") // 每条数据生成间隔时间
        .option("numPartitions","2") // 分区数目
        .load()
/*
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
*/
//rateStreamDF.printSchema()
// 3. 设置Streaming应用输出及启动
        val query:StreamingQuery=rateStreamDF.writeStream
// 设置输出模式:Append表示新数据以追加方式输出
        .outputMode(OutputMode.Append())
        .format("console")
        .option("numRows","10")
        .option("truncate","false")
// 流式应用,需要启动start
        .start()
// 流式查询等待流式应用终止
        query.awaitTermination()
// 等待所有任务运行完成才停止运行
        query.stop()
        }
    }

运行应用程序,随机生成的数据,截图如下:


efec1bfb487f4fa7b57cba7985e74ed3.png

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
9小时前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
9小时前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
10小时前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之spark3.1.1通过resource目录下的conf文件配置,报错如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
9小时前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
143 0
|
9小时前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
10小时前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
59 0
|
10小时前
|
分布式计算 大数据 Java
Spark 大数据实战:基于 RDD 的大数据处理分析
Spark 大数据实战:基于 RDD 的大数据处理分析
138 0
|
分布式计算 API 流计算
Spark 2.0 Structured Streaming 分析
Spark 2.0 将流式计算也统一到DataFrame里去了,提出了Structured Streaming的概念,将数据源映射为一张无线长度的表,同时将流式计算的结果映射为另外一张表,完全以结构化的方式去操作流式数据,复用了其对象的Catalyst引擎。
3600 0
|
10小时前
|
机器学习/深度学习 SQL 分布式计算
Apache Spark 的基本概念和在大数据分析中的应用
介绍 Apache Spark 的基本概念和在大数据分析中的应用
163 0
|
10小时前
|
机器学习/深度学习 SQL 分布式计算
介绍 Apache Spark 的基本概念和在大数据分析中的应用。
介绍 Apache Spark 的基本概念和在大数据分析中的应用。