Spark修炼之道(进阶篇)——Spark入门到精通:第十四节 Spark Streaming 缓存、Checkpoint机制

简介: 作者:周志湖 微信号:zhouzhihubeyond主要内容本节内容基于官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.htmlSpark Stream 缓存Checkpoint案例1. Spark Stream 缓存通过前面一系列的课程介绍,我们知道DS

作者:周志湖
微信号:zhouzhihubeyond

主要内容

本节内容基于官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html

  1. Spark Stream 缓存
  2. Checkpoint
  3. 案例

1. Spark Stream 缓存

通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一样,也可以将流式数据持久化到内容当中,采用的同样是persisit方法,调用该方法后DStream将持久化所有的RDD数据。这对于一些需要重复计算多次或数据需要反复被使用的DStream特别有效。像reduceByWindow、reduceByKeyAndWindow等基于窗口操作的方法,它们默认都是有persisit操作的。reduceByKeyAndWindow方法源码具体如下:

def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration,
      partitioner: Partitioner,
      filterFunc: ((K, V)) => Boolean
    ): DStream[(K, V)] = ssc.withScope {

    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
    val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
    val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
    new ReducedWindowedDStream[K, V](
      self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
      windowDuration, slideDuration, partitioner
    )
  }

从上面的方法来看,它最返回的是一个ReducedWindowedDStream对象,跳到该类的源码中可以看到在其主构造函数中包含下面两段代码:

private[streaming]
class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
    parent: DStream[(K, V)],
    reduceFunc: (V, V) => V,
    invReduceFunc: (V, V) => V,
    filterFunc: Option[((K, V)) => Boolean],
    _windowDuration: Duration,
    _slideDuration: Duration,
    partitioner: Partitioner
  ) extends DStream[(K, V)](parent.ssc) {
  //省略其它非关键代码

  //默认被缓存到内存当中
  // Persist RDDs to memory by default as these RDDs are going to be reused.
  super.persist(StorageLevel.MEMORY_ONLY_SER)
  reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
}

通过上面的代码我们可以看到,通过窗口操作产生的DStream不需要开发人员手动去调用persist方法,Spark会自动帮我们将数据缓存当内存当中。同一般的RDD类似,DStream支持的persisit级别为:
这里写图片描述

2. Checkpoint机制

通过前期对Spark Streaming的理解,我们知道,Spark Streaming应用程序如果不手动停止,则将一直运行下去,在实际中应用程序一般是24小时*7天不间断运行的,因此Streaming必须对诸如系统错误、JVM出错等与程序逻辑无关的错误(failures )具体很强的弹性,具备一定的非应用程序出错的容错性。Spark Streaming的Checkpoint机制便是为此设计的,它将足够多的信息checkpoint到某些具备容错性的存储系统如HDFS上,以便出错时能够迅速恢复。有两种数据可以chekpoint:

(1)Metadata checkpointing
将流式计算的信息保存到具备容错性的存储上如HDFS,Metadata Checkpointing适用于当streaming应用程序Driver所在的节点出错时能够恢复,元数据包括:
Configuration(配置信息) - 创建streaming应用程序的配置信息
DStream operations - 在streaming应用程序中定义的DStreaming操作
Incomplete batches - 在列队中没有处理完的作业

(2)Data checkpointing
将生成的RDD保存到外部可靠的存储当中,对于一些数据跨度为多个bactch的有状态tranformation操作来说,checkpoint非常有必要,因为在这些transformation操作生成的RDD对前一RDD有依赖,随着时间的增加,依赖链可能会非常长,checkpoint机制能够切断依赖链,将中间的RDD周期性地checkpoint到可靠存储当中,从而在出错时可以直接从checkpoint点恢复。

具体来说,metadata checkpointing主要还是从drvier失败中恢复,而Data Checkpoing用于对有状态的transformation操作进行checkpointing

Checkpointing具体的使用方式时通过下列方法:

//checkpointDirectory为checkpoint文件保存目录
streamingContext.checkpoint(checkpointDirectory)

3. 案例

程序来源:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
进行了适量修改

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam

/**
 * Counts words in text encoded with UTF8 received from the network every second.
 *
 * Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
 *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
 *   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
 *   <output-file> file to which the word counts will be appended
 *
 * <checkpoint-directory> and <output-file> must be absolute paths
 *
 * To run this on your local machine, you need to first run a Netcat server
 *
 *      `$ nc -lk 9999`
 *
 * and run the example as
 *
 *      `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
 *              localhost 9999 ~/checkpoint/ ~/out`
 *
 * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
 * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
 * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
 * the checkpoint data.
 *
 * Refer to the online documentation for more details.
 */
object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
    : StreamingContext = {


    //程序第一运行时会创建该条语句,如果应用程序失败,则会从checkpoint中恢复,该条语句不会执行
    println("Creating new context")
    val outputFile = new File(outputPath)
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[4]")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(checkpointDirectory)

    //将socket作为数据源
    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
      val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
      println(counts)
      println("Appending to " + outputFile.getAbsolutePath)
      Files.append(counts + "\n", outputFile, Charset.defaultCharset())
    })
    ssc
  }
  //将String转换成Int
  private object IntParam {
  def unapply(str: String): Option[Int] = {
    try {
      Some(str.toInt)
    } catch {
      case e: NumberFormatException => None
    }
  }
}
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
      System.err.println(
        """
          |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
          |     <output-file>. <hostname> and <port> describe the TCP server that Spark
          |     Streaming would connect to receive data. <checkpoint-directory> directory to
          |     HDFS-compatible file system which checkpoint data <output-file> file to which the
          |     word counts will be appended
          |
          |In local mode, <master> should be 'local[n]' with n > 1
          |Both <checkpoint-directory> and <output-file> must be absolute paths
        """.stripMargin
      )
      System.exit(1)
    }
   val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
    //getOrCreate方法,从checkpoint中重新创建StreamingContext对象或新创建一个StreamingContext对象
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => {
        createContext(ip, port, outputPath, checkpointDirectory)
      })
    ssc.start()
    ssc.awaitTermination()
  }
}

输入参数配置如下:
这里写图片描述

运行状态图如下:
这里写图片描述

首次运行时:

//创建新的StreamingContext
Creating new context
15/11/30 07:20:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/11/30 07:20:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Counts at time 1448896840000 ms []
Appending to /root/out2
15/11/30 07:20:47 WARN BlockManager: Block input-0-1448896847000 replicated to only 0 peer(s) instead of 1 peers
Counts at time 1448896850000 ms [(Spark,1), (Context,1)]

手动将程序停止,然后重新运行

//这时从checkpoint目录中读取元数据信息,进行StreamingContext的恢复
Counts at time 1448897070000 ms []
Appending to /root/out2
Counts at time 1448897080000 ms []
Appending to /root/out2
Counts at time 1448897090000 ms []
Appending to /root/out2
15/11/30 07:24:58 WARN BlockManager: Block input-0-1448897098600 replicated to only 0 peer(s) instead of 1 peers
[Stage 8:>                                                          (0 + 0) / 4]Counts at time 1448897100000 ms [(Spark,1), (Context,1)]
Appending to /root/out2
目录
相关文章
|
3月前
|
缓存 Java 数据库连接
mybatis复习05,mybatis的缓存机制(一级缓存和二级缓存及第三方缓存)
文章介绍了MyBatis的缓存机制,包括一级缓存和二级缓存的配置和使用,以及如何整合第三方缓存EHCache。详细解释了一级缓存的生命周期、二级缓存的开启条件和配置属性,以及如何通过ehcache.xml配置文件和logback.xml日志配置文件来实现EHCache的整合。
mybatis复习05,mybatis的缓存机制(一级缓存和二级缓存及第三方缓存)
|
4月前
|
缓存 应用服务中间件 nginx
Web服务器的缓存机制与内容分发网络(CDN)
【8月更文第28天】随着互联网应用的发展,用户对网站响应速度的要求越来越高。为了提升用户体验,Web服务器通常会采用多种技术手段来优化页面加载速度,其中最重要的两种技术就是缓存机制和内容分发网络(CDN)。本文将深入探讨这两种技术的工作原理及其实现方法,并通过具体的代码示例加以说明。
455 1
|
26天前
|
存储 缓存 监控
后端开发中的缓存机制:深度解析与最佳实践####
本文深入探讨了后端开发中不可或缺的一环——缓存机制,旨在为读者提供一份详尽的指南,涵盖缓存的基本原理、常见类型(如内存缓存、磁盘缓存、分布式缓存等)、主流技术选型(Redis、Memcached、Ehcache等),以及在实际项目中如何根据业务需求设计并实施高效的缓存策略。不同于常规摘要的概述性质,本摘要直接点明文章将围绕“深度解析”与“最佳实践”两大核心展开,既适合初学者构建基础认知框架,也为有经验的开发者提供优化建议与实战技巧。 ####
|
22天前
|
缓存 Java 数据库连接
MyBatis缓存机制
MyBatis提供两级缓存机制:一级缓存(Local Cache)默认开启,作用范围为SqlSession,重复查询时直接从缓存读取;二级缓存(Second Level Cache)需手动开启,作用于Mapper级别,支持跨SqlSession共享数据,减少数据库访问,提升性能。
27 1
|
25天前
|
缓存 Java 数据库连接
深入探讨:Spring与MyBatis中的连接池与缓存机制
Spring 与 MyBatis 提供了强大的连接池和缓存机制,通过合理配置和使用这些机制,可以显著提升应用的性能和可扩展性。连接池通过复用数据库连接减少了连接创建和销毁的开销,而 MyBatis 的一级缓存和二级缓存则通过缓存查询结果减少了数据库访问次数。在实际应用中,结合具体的业务需求和系统架构,优化连接池和缓存的配置,是提升系统性能的重要手段。
41 4
|
2月前
|
缓存 NoSQL Java
springboot的缓存和redis缓存,入门级别教程
本文介绍了Spring Boot中的缓存机制,包括使用默认的JVM缓存和集成Redis缓存,以及如何配置和使用缓存来提高应用程序性能。
129 1
springboot的缓存和redis缓存,入门级别教程
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
49 4
|
2月前
|
存储 缓存 负载均衡
Nginx代理缓存机制
【10月更文挑战第2天】
106 4
|
2月前
|
存储 缓存 NoSQL
深入理解后端缓存机制的重要性与实践
本文将探讨在后端开发中缓存机制的应用及其重要性。缓存,作为提高系统性能和用户体验的关键技术,对于后端开发来说至关重要。通过减少数据库访问次数和缩短响应时间,缓存可以显著提升应用程序的性能。本文将从缓存的基本概念入手,介绍常见的缓存策略和实现方式,并通过实例展示如何在后端开发中有效应用缓存技术。最后,我们将讨论缓存带来的一些挑战及其解决方案,帮助您在实际项目中更好地利用缓存机制。
|
3月前
|
存储 缓存 Android开发
Android RecyclerView 缓存机制深度解析与面试题
本文首发于公众号“AntDream”,详细解析了 `RecyclerView` 的缓存机制,包括多级缓存的原理与流程,并提供了常见面试题及答案。通过本文,你将深入了解 `RecyclerView` 的高性能秘诀,提升列表和网格的开发技能。
80 8