Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming

简介: 主要内容Spark SQL、DataFrame与Spark Streaming1. Spark SQL、DataFrame与Spark Streaming源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/stre

主要内容

  1. Spark SQL、DataFrame与Spark Streaming

1. Spark SQL、DataFrame与Spark Streaming

源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel

object SqlNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 2 second batch size
    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    //Socke作为数据源
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    //words DStream
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    //调用foreachRDD方法,遍历DStream中的RDD
    words.foreachRDD((rdd: RDD[String], time: Time) => {
      // Get the singleton instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Register as table
      wordsDataFrame.registerTempTable("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        sqlContext.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    })

    ssc.start()
    ssc.awaitTermination()
  }
}


/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

运行程序后,再运行下列命令

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data

处理结果:


========= 1448783840000 ms =========
+---------+-----+
|     word|total|
+---------+-----+
|    Spark|   12|
|   system|   12|
|  general|   12|
|     fast|   12|
|      and|   12|
|computing|   12|
|        a|   12|
|       is|   12|
|      for|   12|
|      Big|   12|
|  cluster|   12|
|     Data|   12|
+---------+-----+

========= 1448783842000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+

========= 1448783844000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+
目录
相关文章
|
5天前
|
SQL 存储 分布式计算
|
4天前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
12 1
|
12天前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
22 3
|
2月前
|
分布式计算 Java Scala
如何处理 Spark Streaming 的异常情况?
【6月更文挑战第16天】如何处理 Spark Streaming 的异常情况?
135 56
|
1月前
|
分布式计算 监控 数据处理
Spark Streaming:解锁实时数据处理的力量
【7月更文挑战第15天】Spark Streaming作为Spark框架的一个重要组成部分,为实时数据处理提供了高效、可扩展的解决方案。通过其微批处理的工作模式和强大的集成性、容错性特性,Spark Streaming能够轻松应对各种复杂的实时数据处理场景。然而,在实际应用中,我们还需要根据具体需求和资源情况进行合理的部署和优化,以确保系统的稳定性和高效性。
|
1月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之怎么编写和执行Spark SQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
30天前
|
分布式计算 Apache Spark
|
2月前
|
SQL JSON 分布式计算
|
1月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
83 1
Spark快速大数据分析PDF下载读书分享推荐
|
分布式计算 Spark 流计算
SparkStreaming入门及例子
看书大概了解了下Streaming的原理,但是木有动过手啊。。。万事开头难啊,一个wordcount 2小时怎么都运行不出结果。是我太蠢了,好了言归正传。 SparkStreaming是一个批处理的流式计算框架,适合处理实时数据与历史数据混合处理的场景(比如,你用streaming将实时数据读入处理,再使用sparkSQL提取历史数据,与之关联处理)。
1056 0