大数据Spark Structured Streaming 2

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据Spark Structured Streaming

2.3 编程模型

Structured Streaming将流式数据当成一个不断增长的table,然后使用和批处理同一套API,都是基于DataSet/DataFrame的。如下图所示,通过将流式数据理解成一张不断增长的表,从而就可以像操作批的静态数据一样来操作流数据了。

在这个模型中,主要存在下面几个组成部分:

e785b6f0db8545c88f48f3b9c4701944.png

第一部分:Input Table(Unbounded Table),流式数据的抽象表示,没有限制边界的,表的

数据源源不断增加;

第二部分:Query(查询),对 Input Table 的增量式查询,只要Input Table中有数据,立即(默认情况)执行查询分析操作,然后进行输出(类似SparkStreaming中微批处理);

第三部分:Result Table,Query 产生的结果表;056159a6b88441659af32cf3b7a6844f.png

第四部分:Output,Result Table 的输出,依据设置的输出模式OutputMode输出结果;

Structured Streaming最核心的思想就是将实时到达的数据看作是一个不断追加的unboundtable无界表,到达流的每个数据项就像是表中的一个新行被附加到无边界的表中,用静态结构化数据的批处理查询方式进行流计算。

b5c5559834a042099eaf24eabd7e9850.png

以词频统计WordCount案例,Structured Streaming实时处理数据的示意图如下,各行含义:

第一行、表示从TCP Socket不断接收数据,使用【nc -lk 9999】;

第二行、表示时间轴,每隔1秒进行一次数据处理;

第三行、可以看成是“input unbound table",当有新数据到达时追加到表中;

第四行、最终的wordCounts是结果表,新数据到达后触发查询Query,输出的结果;

第五行、当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;该示例设置为CompleteMode,因此每次都将所有数据输出到控制台;cb08ac014a2043b0acb00a05ec802823.png

上图中数据实时处理说明:

第一、在第1秒时,此时到达的数据为"cat dog"和"dog dog",因此可以得到第1秒时的结果集cat=1dog=3,并输出到控制台;

第二、当第2秒时,到达的数据为"owl cat",此时"unbound table"增加了一行数据"owl cat",执

行word count查询并更新结果集,可得第2秒时的结果集为cat=2 dog=3 owl=1,并输出到控制台;

第三、当第3秒时,到达的数据为"dog"和"owl",此时"unbound table"增加两行数据"dog"和

“owl”,执行word count查询并更新结果集,可得第3秒时的结果集为cat=2 dog=4 owl=2;使用Structured Streaming处理实时数据时,会负责将新到达的数据与历史数据进行整合,并完成正确的计算操作,同时更新Result Table。

3 入门案例:WordCount

入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。

文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example

3.1 功能演示

运行词频统计WordCount程序,从TCP Socket消费数据,官方演示说明截图如下:274fc76b003c4d2cba0b7e328502dcf2.png

演示运行案例步骤:


第一步、打开终端Terminal,运行NetCat,命令为:nc -lk 9999

第二步、打开另一个终端Terminal,执行如下命令

# 官方入门案例运行:词频统计
/export/server/spark/bin/run-example \
--master local[2] \
--conf spark.sql.shuffle.partitions=2 \
org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount \
node1.oldlu.cn 9999
# 测试数据
spark hadoop spark hadoop spark hive
spark spark spark
spark hadoop hive

发送数据以后,最终统计输出结果如下:

3761b21e811541f5807cda843886b14b.png

3.2 Socket 数据源

从Socket中读取UTF8文本数据。一般用于测试,使用nc -lk 端口号向Socket监听的端口发送数据,用于测试使用,有两个参数:


参数一:host,主机名称,必须指定参数

参数二:port,端口号,必须指定参数

范例如下所示:

b7d6efb2e75b4130b803d1dcf8eb9085.png

3.3 Console 接收器

将结果数据打印到控制台或者标准输出,通常用于测试或Bedug使用,三种输出模式OutputMode(Append、Update、Complete)都支持,两个参数可设置:


参数一:numRows,打印多少条数据,默认为20条;

参数二:truncate,如果某列值字符串太长是否截取,默认为true,截取字符串;

范例如下所示:

1a5ca1d652fb4455b86c18874cc54bcf.png

3.4 编程实现

可以认为Structured Streaming = SparkStreaming + SparkSQL,对流式数据处理使用SparkSQL数据结构,应用入口为SparkSession,对比SparkSQL与SparkStreaming编程:


Spark Streaming:将流式数据按照时间间隔(BatchInterval)划分为很多Batch,每批次数据封装在RDD中,底层RDD数据,构建StreamingContext实时消费数据;

Structured Streaming属于SparkSQL模块中一部分,对流式数据处理,构建SparkSession对象,

指定读取Stream数据和保存Streamn数据,具体语法格式:

加载数据Load:读取静态数据【spark.read】、读取流式数据【spark.readStream】

保存数据Save:保存静态数据【ds/df.write】、保存流式数据【ds/df.writeStrem】词频统计案例: 从TCP Socket实消费流式数据,进行词频统计,将结果打印在控制台Console 。

第一点、程序入口SparkSession,加载流式数据:spark.readStream
第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式
第三点、启动流式应用,设置Output结果相关信息、start方法启动应用

完整案例代码如下:

import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.{DataFrame,SparkSession}
/**
 * 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。
 */
        object StructuredWordCount{
        def main(args:Array[String]):Unit={
// TODO: 构建SparkSession实例对象
        val spark:SparkSession=SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
        .config("spark.sql.shuffle.partitions","2") // 设置Shuffle分区数目
        .getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 1. 从TCP Socket 读取数据
val inputStreamDF:DataFrame=spark.readStream
        .format("socket")
        .option("host","node1.oldlu.cn").option("port",9999)
        .load()
/*
root
|-- value: string (nullable = true)
*/
//inputStreamDF.printSchema()
// TODO: 2. 业务分析:词频统计WordCount
        val resultStreamDF:DataFrame=inputStreamDF
        .as[String] // 将DataFrame转换为Dataset进行操作
// 过滤数据
        .filter(line=>null!=line&&line.trim.length>0)
// 分割单词
        .flatMap(line=>line.trim.split("\\s+"))
        .groupBy($"value").count() // 按照单词分组,聚合
/*
root
|-- value: string (nullable = true)
|-- count: long (nullable = false)
*/
//resultStreamDF.printSchema()
// TODO: 3. 设置Streaming应用输出及启动
        val query:StreamingQuery=resultStreamDF.writeStream
// TODO: 设置输出模式:Complete表示将ResultTable中所有结果数据输出
// .outputMode(OutputMode.Complete())
// TODO: 设置输出模式:Update表示将ResultTable中有更新结果数据输出
        .outputMode(OutputMode.Update())
        .format("console")
        .option("numRows","10").option("truncate","false")
// 流式应用,需要启动start
        .start()
// 流式查询等待流式应用终止
        query.awaitTermination()
// 等待所有任务运行完成才停止运行
        query.stop()
        }
     }

其中可以设置不同输出模式(OutputMode),当设置为Complete时,结果表ResultTable中

所有数据都输出;当设置为Update时,仅仅输出结果表ResultTable中更新的数据。


4 DataStreamReader 接口

从Spark 2.0至Spark 2.4版本,目前支持数据源有4种,其中Kafka 数据源使用作为广泛,其他数据源主要用于开发测试程序。

文档: http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#input-sources


c21dd86533e044ef971ec2d8c75cf17c.png

在Structured Streaming中使用SparkSession#readStream读取流式数据,返回DataStreamReader对象,指定读取数据源相关信息,声明如下:

40d809c0e8d54a29a089c6c0466cc27b.png

查看DataStreamReader中方法可以发现与DataFrameReader中基本一致,编码上更加方便加载流式数据。

993e75892cc9410ebf2f2dfa950f727b.png


5 文件数据源

将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet,可以设置相关可选参数:

79f94d7d41834190999973445cb44622.png

从文件数据源加载数据伪代码如下:


val streamDF = spark
        .readStream
// Schema must be specified when creating a streaming source DataFrame.
        .schema(schema)
// 每个trigger最大文件数量
        .option("maxFilesPerTrigger",100)
// 是否首先计算最新的文件,默认为false
        .option("latestFirst",value = true)
// 是否只检查名字,如果名字相同,则不视为更新,默认为false
        .option("fileNameOnly",value = true)
        .csv("*.csv")

演示范例:监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。


测试数据

jack;23;running
charles;32;basketball
tom;28;football
lili;24;running
bob;20;swimming
zhangsan;32;running
lisi;28;running
wangwu;24;running
zhaoliu;26;swimming
honghong;28;running`

业务实现代码,监控Windows系统目录【D:/datas】

import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery}
import org.apache.spark.sql.types.{IntegerType,StringType,StructType}
import org.apache.spark.sql.{DataFrame,Dataset,Row,SparkSession}
/**
 * 使用Structured Streaming从目录中读取文件数据:统计年龄小于25岁的人群的爱好排行榜
 */
        object StructuredFileSource{
        def main(args:Array[String]):Unit={
// 构建SparkSession实例对象
        val spark:SparkSession=SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
// 设置Shuffle分区数目
        .config("spark.sql.shuffle.partitions","2")
        .getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO: 从文件系统,监控目录,读取CSV格式数据
// 数据样本 -> jack,23,running
val csvSchema:StructType=new StructType()
        .add("name",StringType,nullable=true)
        .add("age",IntegerType,nullable=true)
        .add("hobby",StringType,nullable=true)
        val inputStreamDF:DataFrame=spark.readStream
        .option("sep",";")
        .option("header","false")
// 指定schema信息
        .schema(csvSchema)
        .csv("file:///D:/datas/")
// 依据业务需求,分析数据:统计年龄小于25岁的人群的爱好排行榜
        val resultStreamDF:Dataset[Row]=inputStreamDF
// 年龄小于25岁
        .filter($"age"< 25)
// 按照爱好分组统计
        .groupBy($"hobby").count()
// 按照词频降序排序
        .orderBy($"count".desc)
// 设置Streaming应用输出及启动
        val query:StreamingQuery=resultStreamDF.writeStream
// 对流式应用输出来说,设置输出模式
        .outputMode(OutputMode.Complete())
        .format("console")
        .option("numRows","10")
        .option("truncate","false")
// 流式应用,需要启动start
        .start()
// 查询器等待流式应用终止
        query.awaitTermination()
        query.stop() // 等待所有任务运行完成才停止运行
        }
   }

6 Rate source

以每秒指定的行数生成数据,每个输出行包含2个字段:timestamp和value。其中timestamp是一个Timestamp含有信息分配的时间类型,并且value是Long(包含消息的计数从0开始作为第一行)类型。此源用于测试和基准测试,可选参数如下:

f6de17ebbe2642c58c600e89bbc12a48.png

演示范例代码如下:

import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery,Trigger}
import org.apache.spark.sql.{DataFrame,SparkSession}
/**
 * 数据源:Rate Source,以每秒指定的行数生成数据,每个输出行包含一个timestamp和value。
 */
        object StructuredRateSource{
        def main(args:Array[String]):Unit={
// 构建SparkSession实例对象
        val spark:SparkSession=SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
// 设置Shuffle分区数目
        .config("spark.sql.shuffle.partitions","2")
        .getOrCreate()
// 导入隐式转换和函数库
import spark.implicits._
import org.apache.spark.sql.functions._
// TODO:从Rate数据源实时消费数据
val rateStreamDF:DataFrame=spark.readStream
        .format("rate")
        .option("rowsPerSecond","10") // 每秒生成数据数目
        .option("rampUpTime","0s") // 每条数据生成间隔时间
        .option("numPartitions","2") // 分区数目
        .load()
/*
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
*/
//rateStreamDF.printSchema()
// 3. 设置Streaming应用输出及启动
        val query:StreamingQuery=rateStreamDF.writeStream
// 设置输出模式:Append表示新数据以追加方式输出
        .outputMode(OutputMode.Append())
        .format("console")
        .option("numRows","10")
        .option("truncate","false")
// 流式应用,需要启动start
        .start()
// 流式查询等待流式应用终止
        query.awaitTermination()
// 等待所有任务运行完成才停止运行
        query.stop()
        }
    }

运行应用程序,随机生成的数据,截图如下:


efec1bfb487f4fa7b57cba7985e74ed3.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
3月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
231 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
188 2
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
182 1
|
3月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
3月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
27天前
|
SQL 数据可视化 大数据
从数据小白到大数据达人:一步步成为数据分析专家
从数据小白到大数据达人:一步步成为数据分析专家
211 92
|
3月前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
731 7
|
3月前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
97 2
|
1月前
|
分布式计算 Shell MaxCompute
odps测试表及大量数据构建测试
odps测试表及大量数据构建测试
|
3月前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
155 1