2.3 编程模型
Structured Streaming将流式数据当成一个不断增长的table,然后使用和批处理同一套API,都是基于DataSet/DataFrame的。如下图所示,通过将流式数据理解成一张不断增长的表,从而就可以像操作批的静态数据一样来操作流数据了。
在这个模型中,主要存在下面几个组成部分:
第一部分:Input Table(Unbounded Table),流式数据的抽象表示,没有限制边界的,表的
数据源源不断增加;
第二部分:Query(查询),对 Input Table 的增量式查询,只要Input Table中有数据,立即(默认情况)执行查询分析操作,然后进行输出(类似SparkStreaming中微批处理);
第三部分:Result Table,Query 产生的结果表;
第四部分:Output,Result Table 的输出,依据设置的输出模式OutputMode输出结果;
Structured Streaming最核心的思想就是将实时到达的数据看作是一个不断追加的unboundtable无界表,到达流的每个数据项就像是表中的一个新行被附加到无边界的表中,用静态结构化数据的批处理查询方式进行流计算。
以词频统计WordCount案例,Structured Streaming实时处理数据的示意图如下,各行含义:
第一行、表示从TCP Socket不断接收数据,使用【nc -lk 9999】;
第二行、表示时间轴,每隔1秒进行一次数据处理;
第三行、可以看成是“input unbound table",当有新数据到达时追加到表中;
第四行、最终的wordCounts是结果表,新数据到达后触发查询Query,输出的结果;
第五行、当有新的数据到达时,Spark会执行“增量"查询,并更新结果集;该示例设置为CompleteMode,因此每次都将所有数据输出到控制台;
上图中数据实时处理说明:
第一、在第1秒时,此时到达的数据为"cat dog"和"dog dog",因此可以得到第1秒时的结果集cat=1dog=3,并输出到控制台;
第二、当第2秒时,到达的数据为"owl cat",此时"unbound table"增加了一行数据"owl cat",执
行word count查询并更新结果集,可得第2秒时的结果集为cat=2 dog=3 owl=1,并输出到控制台;
第三、当第3秒时,到达的数据为"dog"和"owl",此时"unbound table"增加两行数据"dog"和
“owl”,执行word count查询并更新结果集,可得第3秒时的结果集为cat=2 dog=4 owl=2;使用Structured Streaming处理实时数据时,会负责将新到达的数据与历史数据进行整合,并完成正确的计算操作,同时更新Result Table。
3 入门案例:WordCount
入门案例与SparkStreaming的入门案例基本一致:实时从TCP Socket读取数据(采用nc)实时进行词频统计WordCount,并将结果输出到控制台Console。
文档:http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example
3.1 功能演示
运行词频统计WordCount程序,从TCP Socket消费数据,官方演示说明截图如下:
演示运行案例步骤:
第一步、打开终端Terminal,运行NetCat,命令为:nc -lk 9999
第二步、打开另一个终端Terminal,执行如下命令
# 官方入门案例运行:词频统计 /export/server/spark/bin/run-example \ --master local[2] \ --conf spark.sql.shuffle.partitions=2 \ org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount \ node1.oldlu.cn 9999 # 测试数据 spark hadoop spark hadoop spark hive spark spark spark spark hadoop hive
发送数据以后,最终统计输出结果如下:
3.2 Socket 数据源
从Socket中读取UTF8文本数据。一般用于测试,使用nc -lk 端口号向Socket监听的端口发送数据,用于测试使用,有两个参数:
参数一:host,主机名称,必须指定参数
参数二:port,端口号,必须指定参数
范例如下所示:
3.3 Console 接收器
将结果数据打印到控制台或者标准输出,通常用于测试或Bedug使用,三种输出模式OutputMode(Append、Update、Complete)都支持,两个参数可设置:
参数一:numRows,打印多少条数据,默认为20条;
参数二:truncate,如果某列值字符串太长是否截取,默认为true,截取字符串;
范例如下所示:
3.4 编程实现
可以认为Structured Streaming = SparkStreaming + SparkSQL,对流式数据处理使用SparkSQL数据结构,应用入口为SparkSession,对比SparkSQL与SparkStreaming编程:
Spark Streaming:将流式数据按照时间间隔(BatchInterval)划分为很多Batch,每批次数据封装在RDD中,底层RDD数据,构建StreamingContext实时消费数据;
Structured Streaming属于SparkSQL模块中一部分,对流式数据处理,构建SparkSession对象,
指定读取Stream数据和保存Streamn数据,具体语法格式:
加载数据Load:读取静态数据【spark.read】、读取流式数据【spark.readStream】
保存数据Save:保存静态数据【ds/df.write】、保存流式数据【ds/df.writeStrem】词频统计案例: 从TCP Socket实消费流式数据,进行词频统计,将结果打印在控制台Console 。
第一点、程序入口SparkSession,加载流式数据:spark.readStream 第二点、数据封装Dataset/DataFrame中,分析数据时,建议使用DSL编程,调用API,很少使用SQL方式 第三点、启动流式应用,设置Output结果相关信息、start方法启动应用
完整案例代码如下:
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery} import org.apache.spark.sql.{DataFrame,SparkSession} /** * 使用Structured Streaming从TCP Socket实时读取数据,进行词频统计,将结果打印到控制台。 */ object StructuredWordCount{ def main(args:Array[String]):Unit={ // TODO: 构建SparkSession实例对象 val spark:SparkSession=SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") .config("spark.sql.shuffle.partitions","2") // 设置Shuffle分区数目 .getOrCreate() // 导入隐式转换和函数库 import spark.implicits._ import org.apache.spark.sql.functions._ // TODO: 1. 从TCP Socket 读取数据 val inputStreamDF:DataFrame=spark.readStream .format("socket") .option("host","node1.oldlu.cn").option("port",9999) .load() /* root |-- value: string (nullable = true) */ //inputStreamDF.printSchema() // TODO: 2. 业务分析:词频统计WordCount val resultStreamDF:DataFrame=inputStreamDF .as[String] // 将DataFrame转换为Dataset进行操作 // 过滤数据 .filter(line=>null!=line&&line.trim.length>0) // 分割单词 .flatMap(line=>line.trim.split("\\s+")) .groupBy($"value").count() // 按照单词分组,聚合 /* root |-- value: string (nullable = true) |-- count: long (nullable = false) */ //resultStreamDF.printSchema() // TODO: 3. 设置Streaming应用输出及启动 val query:StreamingQuery=resultStreamDF.writeStream // TODO: 设置输出模式:Complete表示将ResultTable中所有结果数据输出 // .outputMode(OutputMode.Complete()) // TODO: 设置输出模式:Update表示将ResultTable中有更新结果数据输出 .outputMode(OutputMode.Update()) .format("console") .option("numRows","10").option("truncate","false") // 流式应用,需要启动start .start() // 流式查询等待流式应用终止 query.awaitTermination() // 等待所有任务运行完成才停止运行 query.stop() } }
其中可以设置不同输出模式(OutputMode),当设置为Complete时,结果表ResultTable中
所有数据都输出;当设置为Update时,仅仅输出结果表ResultTable中更新的数据。
4 DataStreamReader 接口
从Spark 2.0至Spark 2.4版本,目前支持数据源有4种,其中Kafka 数据源使用作为广泛,其他数据源主要用于开发测试程序。
文档: http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#input-sources
在Structured Streaming中使用SparkSession#readStream读取流式数据,返回DataStreamReader对象,指定读取数据源相关信息,声明如下:
查看DataStreamReader中方法可以发现与DataFrameReader中基本一致,编码上更加方便加载流式数据。
5 文件数据源
将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet,可以设置相关可选参数:
从文件数据源加载数据伪代码如下:
val streamDF = spark .readStream // Schema must be specified when creating a streaming source DataFrame. .schema(schema) // 每个trigger最大文件数量 .option("maxFilesPerTrigger",100) // 是否首先计算最新的文件,默认为false .option("latestFirst",value = true) // 是否只检查名字,如果名字相同,则不视为更新,默认为false .option("fileNameOnly",value = true) .csv("*.csv")
演示范例:监听某一个目录,读取csv格式数据,统计年龄小于25岁的人群的爱好排行榜。
测试数据
jack;23;running charles;32;basketball tom;28;football lili;24;running bob;20;swimming zhangsan;32;running lisi;28;running wangwu;24;running zhaoliu;26;swimming honghong;28;running`
业务实现代码,监控Windows系统目录【D:/datas】
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery} import org.apache.spark.sql.types.{IntegerType,StringType,StructType} import org.apache.spark.sql.{DataFrame,Dataset,Row,SparkSession} /** * 使用Structured Streaming从目录中读取文件数据:统计年龄小于25岁的人群的爱好排行榜 */ object StructuredFileSource{ def main(args:Array[String]):Unit={ // 构建SparkSession实例对象 val spark:SparkSession=SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") // 设置Shuffle分区数目 .config("spark.sql.shuffle.partitions","2") .getOrCreate() // 导入隐式转换和函数库 import spark.implicits._ import org.apache.spark.sql.functions._ // TODO: 从文件系统,监控目录,读取CSV格式数据 // 数据样本 -> jack,23,running val csvSchema:StructType=new StructType() .add("name",StringType,nullable=true) .add("age",IntegerType,nullable=true) .add("hobby",StringType,nullable=true) val inputStreamDF:DataFrame=spark.readStream .option("sep",";") .option("header","false") // 指定schema信息 .schema(csvSchema) .csv("file:///D:/datas/") // 依据业务需求,分析数据:统计年龄小于25岁的人群的爱好排行榜 val resultStreamDF:Dataset[Row]=inputStreamDF // 年龄小于25岁 .filter($"age"< 25) // 按照爱好分组统计 .groupBy($"hobby").count() // 按照词频降序排序 .orderBy($"count".desc) // 设置Streaming应用输出及启动 val query:StreamingQuery=resultStreamDF.writeStream // 对流式应用输出来说,设置输出模式 .outputMode(OutputMode.Complete()) .format("console") .option("numRows","10") .option("truncate","false") // 流式应用,需要启动start .start() // 查询器等待流式应用终止 query.awaitTermination() query.stop() // 等待所有任务运行完成才停止运行 } }
6 Rate source
以每秒指定的行数生成数据,每个输出行包含2个字段:timestamp和value。其中timestamp是一个Timestamp含有信息分配的时间类型,并且value是Long(包含消息的计数从0开始作为第一行)类型。此源用于测试和基准测试,可选参数如下:
演示范例代码如下:
import org.apache.spark.sql.streaming.{OutputMode,StreamingQuery,Trigger} import org.apache.spark.sql.{DataFrame,SparkSession} /** * 数据源:Rate Source,以每秒指定的行数生成数据,每个输出行包含一个timestamp和value。 */ object StructuredRateSource{ def main(args:Array[String]):Unit={ // 构建SparkSession实例对象 val spark:SparkSession=SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") // 设置Shuffle分区数目 .config("spark.sql.shuffle.partitions","2") .getOrCreate() // 导入隐式转换和函数库 import spark.implicits._ import org.apache.spark.sql.functions._ // TODO:从Rate数据源实时消费数据 val rateStreamDF:DataFrame=spark.readStream .format("rate") .option("rowsPerSecond","10") // 每秒生成数据数目 .option("rampUpTime","0s") // 每条数据生成间隔时间 .option("numPartitions","2") // 分区数目 .load() /* root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true) */ //rateStreamDF.printSchema() // 3. 设置Streaming应用输出及启动 val query:StreamingQuery=rateStreamDF.writeStream // 设置输出模式:Append表示新数据以追加方式输出 .outputMode(OutputMode.Append()) .format("console") .option("numRows","10") .option("truncate","false") // 流式应用,需要启动start .start() // 流式查询等待流式应用终止 query.awaitTermination() // 等待所有任务运行完成才停止运行 query.stop() } }
运行应用程序,随机生成的数据,截图如下: