Spark修炼之道(进阶篇)——Spark入门到精通:第十二节 Spark Streaming—— DStream Window操作

简介: 作者:周志湖 微信号:zhouzhihubeyond本节主要内容Window Operation入门案例1. Window OperationSpark Streaming提供窗口操作(Window Operation),如下图所示: 上图中,红色实线表示窗口当前的滑动位置,虚线表示前一次窗口位置,窗口每滑动一次,落在该窗口中的RDD被一起同时处理,生

作者:周志湖
微信号:zhouzhihubeyond

本节主要内容

  1. Window Operation
  2. 入门案例

1. Window Operation

Spark Streaming提供窗口操作(Window Operation),如下图所示:
这里写图片描述
上图中,红色实线表示窗口当前的滑动位置,虚线表示前一次窗口位置,窗口每滑动一次,落在该窗口中的RDD被一起同时处理,生成一个窗口DStream(windowed DStream),窗口操作需要设置两个参数:
(1)窗口长度(window length),即窗口的持续时间,上图中的窗口长度为3
(2)滑动间隔(sliding interval),窗口操作执行的时间间隔,上图中的滑动间隔为2
这两个参数必须是原始DStream 批处理间隔(batch interval)的整数倍(上图中的原始DStream的batch interval为1)

2. 入门案例

  1. WindowWordCount——reduceByKeyAndWindow方法使用
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {
  def main(args: Array[String]) {
    //传入的参数为localhost 9999 30 10
    if (args.length != 4) {
      System.err.println("Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()

    val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[4]")
    val sc = new SparkContext(conf)

    // 创建StreamingContext,batch interval为5秒
    val ssc = new StreamingContext(sc, Seconds(5))


    //Socket为数据源
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)

    val words = lines.flatMap(_.split(" "))

    // windows操作,对窗口中的单词进行计数
    val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

通过下列代码启动netcat server

root@sparkmaster:~# nc -lk 9999

再运行WindowWordCount
输入下列语句

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides

观察执行情况:

-------------------------------------------
Time: 1448778805000 ms(10秒,第一个滑动窗口时间)
-------------------------------------------
(provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
...

-------------------------------------------
Time: 1448778815000 ms(10秒后,第二个滑动窗口时间)
-------------------------------------------
(provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
...

-------------------------------------------
Time: 1448778825000 ms(10秒后,第三个滑动窗口时间)
-------------------------------------------
(provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
...

-------------------------------------------
Time: 1448778835000 ms(再经10秒后,超出window length窗口长度,不在计数范围内)
-------------------------------------------

-------------------------------------------
Time: 1448778845000 ms
-------------------------------------------


同样的语句输入两次

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides

观察执行结果如下:

Time: 1448779205000 ms
-------------------------------------------
(provides,2)
(is,2)
(general,2)
(Big,2)
(fast,2)
(cluster,2)
(Data.,2)
(computing,2)
(Spark,2)
(a,2)
...

再输入一次

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides

计算结果如下:


-------------------------------------------
Time: 1448779215000 ms
-------------------------------------------
(provides,3)
(is,3)
(general,3)
(Big,3)
(fast,3)
(cluster,3)
(Data.,3)
(computing,3)
(Spark,3)
(a,3)
...

再输入一次

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides

计算结果如下:


-------------------------------------------
Time: 1448779225000 ms
-------------------------------------------
(provides,4)
(is,4)
(general,4)
(Big,4)
(fast,4)
(cluster,4)
(Data.,4)
(computing,4)
(Spark,4)
(a,4)
...

-------------------------------------------
Time: 1448779235000 ms
-------------------------------------------
(provides,2)
(is,2)
(general,2)
(Big,2)
(fast,2)
(cluster,2)
(Data.,2)
(computing,2)
(Spark,2)
(a,2)
...

-------------------------------------------
Time: 1448779245000 ms
-------------------------------------------
(provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
...

-------------------------------------------
Time: 1448779255000 ms
-------------------------------------------

-------------------------------------------
Time: 1448779265000 ms
-------------------------------------------

2 WindowWordCount——countByWindow方法使用


import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()

    val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 创建StreamingContext
    val ssc = new StreamingContext(sc, Seconds(5))
    // 定义checkpoint目录为当前目录
    ssc.checkpoint(".")


    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
    val words = lines.flatMap(_.split(" "))

    //countByWindowcountByWindow方法计算基于滑动窗口的DStream中的元素的数量。
    val countByWindow=words.countByWindow(Seconds(args(2).toInt), Seconds(args(3).toInt))

    countByWindow.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

启动

root@sparkmaster:~# nc -lk 9999

然后运行WindowWordCount
输入

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data

察看运行结果:

-------------------------------------------
Time: 1448780625000 ms
-------------------------------------------
0

-------------------------------------------
Time: 1448780635000 ms
-------------------------------------------
12

-------------------------------------------
Time: 1448780645000 ms
-------------------------------------------
12

-------------------------------------------
Time: 1448780655000 ms
-------------------------------------------
12

-------------------------------------------
Time: 1448780665000 ms
-------------------------------------------
0

-------------------------------------------
Time: 1448780675000 ms
-------------------------------------------
0

3 WindowWordCount——reduceByWindow方法使用

//reduceByWindow方法基于滑动窗口对源DStream中的元素进行聚合操作,返回包含单元素的一个新的DStream。
 val reduceByWindow=words.map(x=>1).reduceByWindow(_+_,_-_Seconds(args(2).toInt), Seconds(args(3).toInt))

上面的例子其实是countByWindow的实现,可以在countByWindow源码实现中得到验证

def countByWindow(
      windowDuration: Duration,
      slideDuration: Duration): DStream[Long] = ssc.withScope {
    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
  }

而reduceByWindow又是通过reduceByKeyAndWindow方法来实现的,具体代码如下

def reduceByWindow(
      reduceFunc: (T, T) => T,
      invReduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
      this.map(x => (1, x))
          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
          .map(_._2)
  }

与前面的例子中的reduceByKeyAndWindow方法不同的是这里的reduceByKeyAndWindow方法多了一个invReduceFunc参数,方法完整源码如下:

 /**
   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
   * The reduced value of over a new window is calculated using the old window's reduced value :
   *  1. reduce the new values that entered the window (e.g., adding new counts)
   *
   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
   *
   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
   * However, it is applicable to only "invertible reduce functions".
   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
   * @param reduceFunc associative reduce function
   * @param invReduceFunc inverse reduce function
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   * @param filterFunc     Optional function to filter expired key-value pairs;
   *                       only pairs that satisfy the function are retained
   */
  def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration = self.slideDuration,
      numPartitions: Int = ssc.sc.defaultParallelism,
      filterFunc: ((K, V)) => Boolean = null
    ): DStream[(K, V)] = ssc.withScope {
    reduceByKeyAndWindow(
      reduceFunc, invReduceFunc, windowDuration,
      slideDuration, defaultPartitioner(numPartitions), filterFunc
    )
  }

具体来讲,下面两个方法得到的结果是一样的,只是效率不同,后面的方法方式效率更高:

//以过去5秒钟为一个输入窗口,每1秒统计一下WordCount,本方法会将过去5秒钟的每一秒钟的WordCount都进行统计
//然后进行叠加,得出这个窗口中的单词统计。 这种方式被称为叠加方式,如下图左边所示
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))

//计算t+4秒这个时刻过去5秒窗口的WordCount,可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量
//再减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。 这种方式被称为增量方式,如下图的右边所示
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))

这里写图片描述

DStream支持的全部Window操作方法如下:
这里写图片描述

目录
相关文章
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
40 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
82 0
|
7天前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
16天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
48 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
59 0
|
17天前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
45 6
|
15天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
59 2
|
16天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
56 1
|
17天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
48 1