Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming

简介: 主要内容Spark SQL、DataFrame与Spark Streaming1. Spark SQL、DataFrame与Spark Streaming源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/stre

主要内容

  1. Spark SQL、DataFrame与Spark Streaming

1. Spark SQL、DataFrame与Spark Streaming

源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel

object SqlNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 2 second batch size
    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    //Socke作为数据源
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    //words DStream
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    //调用foreachRDD方法,遍历DStream中的RDD
    words.foreachRDD((rdd: RDD[String], time: Time) => {
      // Get the singleton instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Register as table
      wordsDataFrame.registerTempTable("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        sqlContext.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    })

    ssc.start()
    ssc.awaitTermination()
  }
}


/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

运行程序后,再运行下列命令

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data

处理结果:


========= 1448783840000 ms =========
+---------+-----+
|     word|total|
+---------+-----+
|    Spark|   12|
|   system|   12|
|  general|   12|
|     fast|   12|
|      and|   12|
|computing|   12|
|        a|   12|
|       is|   12|
|      for|   12|
|      Big|   12|
|  cluster|   12|
|     Data|   12|
+---------+-----+

========= 1448783842000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+

========= 1448783844000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+
目录
相关文章
|
23天前
|
SQL 存储 关系型数据库
数据库SQL入门指南
数据库SQL入门指南
|
26天前
|
SQL 存储 分布式计算
|
8天前
|
Java 数据库连接 数据库
告别繁琐 SQL!Hibernate 入门指南带你轻松玩转 ORM,解锁高效数据库操作新姿势
【8月更文挑战第31天】Hibernate 是一款流行的 Java 持久层框架,简化了对象关系映射(ORM)过程,使开发者能以面向对象的方式进行数据持久化操作而无需直接编写 SQL 语句。本文提供 Hibernate 入门指南,介绍核心概念及示例代码,涵盖依赖引入、配置文件设置、实体类定义、工具类构建及基本 CRUD 操作。通过学习,你将掌握使用 Hibernate 简化数据持久化的技巧,为实际项目应用打下基础。
25 0
|
9天前
|
SQL 关系型数据库 数据挖掘
SQL 基础入门简直太重要啦!从零开始,带你轻松掌握数据查询与操作,开启数据世界大门!
【8月更文挑战第31天】在数字化时代,数据无处不在,而 SQL(Structured Query Language)则是开启数据宝藏的关键钥匙。无论你是编程新手还是数据处理爱好者,掌握 SQL 都能帮助你轻松提取和分析信息。SQL 简洁而强大,像一位魔法师,能从庞大数据库中迅速找到所需数据。从查询、条件筛选到排序、分组,SQL 功能多样,还能插入、更新和删除数据,助你在数据海洋中畅游无阻。
25 0
|
22天前
|
SQL 数据库 索引
SQL语言入门:如何表达你的数据需求
在数据库的世界里,SQL(Structured Query Language)是一种至关重要的语言,它允许用户与数据库进行交互,执行数据的查询、更新、插入和删除等操作
|
2月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之怎么编写和执行Spark SQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
SQL 关系型数据库 MySQL
【MySQL从入门到精通】常用SQL语句分享
【MySQL从入门到精通】常用SQL语句分享
43 2
|
3月前
|
SQL 存储 安全
SQL入门与进阶:数据库查询与管理的实用指南
一、引言 在数字化时代,数据库已经成为各行各业存储、管理和分析数据的关键基础设施
|
3月前
|
SQL 关系型数据库 MySQL
MySQL数据库数据模型概念入门及基础的SQL语句2024
MySQL数据库数据模型概念入门及基础的SQL语句2024
34 0