大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)

接上篇:https://developer.aliyun.com/article/1622641?spm=a2c6h.13148508.setting.19.27ab4f0ehhuqRu

[窗口操作] 案例3 热点搜索词实时统计

编写代码

package icu.wzk

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HotWordStats {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("HotWordStats")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(2))
    // 检查点设置 也可以设置到 HDFS
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("checkpoint")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words: DStream[String] = lines.flatMap(_.split("\\s+"))
    val pairs: DStream[(String, Int)] = words.map(x => (x, 1))

    // 通过 reduceByKeyAndWindow算子 每隔10秒统计最近20秒的词出现的的次数
    val wordCounts1: DStream[(String, Int)] = pairs
      .reduceByKeyAndWindow(
        (a: Int, b: Int) => a + b, Seconds(20), Seconds(10), 2
      )
    wordCounts1.print()

    // 需要CheckPoint的支持
    val wordCounts2: DStream[(String, Int)] = pairs
      .reduceByKeyAndWindow(
        _ + _, _ - _, Seconds(20), Seconds(10), 2
      )
    wordCounts2.print()

    // 运行程序
    ssc.start()
    ssc.awaitTermination()
  }

}

运行结果

-------------------------------------------
Time: 1721629842000 ms
-------------------------------------------
(4,1)
(8,1)
(6,1)
(2,1)
(7,1)
(5,1)
(3,1)
(1,1)

-------------------------------------------
Time: 1721629842000 ms
--------------------

运行结果如下图:

[状态追踪操作] updateStateByKey

UpdateStateByKey的主要功能:


为Streaming中每一个Key维护一份State状态,state类型可以是任意类型的,可以是自定义对象,更新函数也可以是自定义的

通过更新函数对该Key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候已经存在的key进行state状态更新

使用updateStateByKey时要开启 CheckPoint 功能

编写代码1

流式程序启动后计算wordcount的累计值,将每个批次的结果保存到文件

package icu.wzk


import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StateTracker1 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("StateTracker1")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("checkpoint")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words: DStream[String] = lines.flatMap(_.split("\\s+"))
    val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))

    // 定义状态更新函数
    // 函数常量定义 返回类型是 Some(Int),表示的含义是最新状态
    // 函数的功能是将当前时间间隔内产生的Key的Value集合,加到上一个状态中,得到最新状态
    val updateFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
      // 通过Spark内部的reduceByKey按Key规约,然后这里传入某Key当前批次的Seq,再计算当前批次的总和
      val currentCount = currValues.sum
      // 已累加的值
      val previousCount = prevValueState.getOrElse(0)
      Some(currentCount + previousCount)
    }

    val stateDStream: DStream[(String, Int)] = wordDStream.updateStateByKey[Int](updateFunc)
    stateDStream.print()

    // 把DStream保存到文本文件中 会生成很多的小文件 一个批次生成一个目录
    val outputDir = "output1"
    stateDStream
      .repartition(1)
      .saveAsTextFiles(outputDir)

    // 开始运行
    ssc.start()
    ssc.awaitTermination()
  }
}

运行结果1

-------------------------------------------
Time: 1721631080000 ms
-------------------------------------------
(1,1)
(2,1)
(3,1)

-------------------------------------------
Time: 1721631085000 ms
-------------------------------------------
(8,1)
(1,1)
(2,1)
(3,1)
(4,1)
(5,1)
(6,1)
(7,1)

运行结果是:

统计全局的Key的状态,但是就算没有数据输入,也会在每一个批次的时候返回之前的Key的状态。


这样的缺点:


如果数据量很大的话,CheckPoint数据会占用较大存储,而且效率也不高

编写代码2

mapWithState:也是用于全局统计Key的状态,如果没有数据输入,便不会返回之前的Key的状态,有一点增量的感觉。

这样做的好处是,只关心那些已经发生的变化的Key,对于没有数据输入,则不会返回那些没有变化的Key的数据,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。

package icu.wzk

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object StateTracker2 {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("StateTracker2")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("checkpoint")

    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val words: DStream[String] = lines.flatMap(_.split("\\s+"))
    val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))

    def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = {
      val sum: Int = one.getOrElse(0) + state.getOption.getOrElse(0)
      state.update(sum)
      (key, sum)
    }

    val spec = StateSpec.function(mappingFunction _)
    val resultDStream: DStream[(String, Int)] = wordDStream.mapWithState(spec)

    resultDStream.cache()

    // 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录
    val outputDir = "output2"
    resultDStream.repartition(1).saveAsTextFiles(outputDir)

    ssc.start()
    ssc.awaitTermination()

  }
}

运行代码

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
157 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
106 6
|
3月前
|
SQL 分布式计算 NoSQL
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
42 1
大数据-164 Apache Kylin Cube优化 案例1 定义衍生维度与对比 超详细
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
137 2
|
2月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
98 1
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
77 1
|
3月前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
70 1
|
2月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
3月前
|
分布式计算 大数据 Linux
大数据体系知识学习(二):WordCount案例实现及错误总结
这篇文章介绍了如何使用PySpark进行WordCount操作,包括环境配置、代码实现、运行结果和遇到的错误。作者在运行过程中遇到了Py4JJavaError和JAVA_HOME未设置的问题,并通过导入findspark初始化和设置环境变量解决了这些问题。文章还讨论了groupByKey和reduceByKey的区别。
47 1
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
57 3

热门文章

最新文章