大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(正在更新!)

章节内容

上节我们完成了如下的内容:


Spark Streaming 基础概述

Spark Streaming 架构概念

编程模型

优点缺点概括

与 Kafka 集成

1e213702e80d7e50a861e7125223c8e3_febbafd8dd8f418a9f285b3e98434570.png 基础概念

基础数据源包括:


文件系统(File System):Spark Streaming 支持监控 HDFS、S3、本地文件系统等目录中的新文件,并将这些文件作为数据流的一部分进行处理。这个数据源适用于处理批量生成的文件。

Socket 数据流(Socket Stream):这是最简单的数据源之一,Spark Streaming 可以通过 TCP 套接字连接接收文本数据流。例如,你可以使用 nc(Netcat)工具向指定端口发送数据,Spark Streaming 可以实时读取这些数据。

Kafka:Kafka 是一个分布式消息系统,常用于构建实时流处理应用。Spark Streaming 提供了直接和高级两种 Kafka 数据源集成方式,支持从 Kafka 主题中读取数据流。

Flume:Apache Flume 是一个分布式、可靠且高可用的系统,用于高效收集、聚合和传输大量日志数据。Spark Streaming 可以通过 Flume 接收数据并处理,常用于日志收集和分析。

Kinesis:Amazon Kinesis 是一个实时流处理服务,Spark Streaming 提供了 Kinesis 数据源的支持,能够从 Kinesis 流中读取数据,并进行实时分析。

自定义数据源:Spark Streaming 允许用户实现自定义的输入源。用户可以通过实现 Receiver 类或使用 Direct DStream API 来创建新的数据源。

f97bb682bae1365d1f3332ec149bfe10_3460c2a52aeb4f1992c0d8d46ce239d3.png

引入依赖

我们使用的话,需要引入依赖:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

文件数据流

基础概念

通过 textFileStreama 方法进行读取 HDFS 兼容的文件系统文件

Spark Streaming 将会监控 directory 目录,并不断处理移动进来的文件


不支持嵌套目录

文件需要有相同的数据格式

文件进入 Directory 的方式需要通过移动或者重命名来实现

一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据

文件流不需要接收器(Receiver),不需要单独分配CPU核

编写代码

package icu.wzk

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object FileDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("FileDStream")
      .setMaster("local[*]")

    // 时间间隔
    val ssc = new StreamingContext(conf, Seconds(5))
    // 本地文件,也可以使用 HDFS 文件
    val lines = ssc.textFileStream("goodtbl.java")
    val words = lines.flatMap(_.split("\\s+"))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    // 打印信息
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

代码解析

object FileDStream: 定义了一个名为 FileDStream 的单例对象,包含 main 方法,这是 Scala 中的入口点,相当于 Java 的 public static void main 方法。

Logger.getLogger(“org”).setLevel(Level.ERROR): 这行代码将日志级别设置为 ERROR,以减少控制台输出的日志信息,只显示错误级别的信息。这通常是为了避免不必要的日志干扰核心的输出。

val conf = new SparkConf(): 创建一个 SparkConf 对象,包含了应用程序的配置信息。

setAppName(“FileDStream”): 设置应用程序的名称为 “FileDStream”。这个名称会在 Spark Web UI 中显示,用于识别应用。

setMaster("local[]"): 设置 Spark 的运行模式为本地模式(local[]),这意味着应用程序将在本地运行,并使用所有可用的 CPU 核心。

val ssc = new StreamingContext(conf, Seconds(5)): 创建一个 StreamingContext 对象,负责管理 Spark Streaming 应用程序的上下文。Seconds(5) 指定了微批处理的时间间隔为 5 秒,也就是每 5 秒钟会处理一次数据。

val words = lines.flatMap(_.split(“\s+”)): 对每一行文本内容进行处理,使用空格或其他空白字符(\s+)进行分割,将每行文本拆分成单词。flatMap 操作会将结果展开为一个包含所有单词的 DStream。

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _): 通过 map 操作将每个单词映射为 (word, 1) 形式的键值对,然后使用 reduceByKey 按键(即单词)进行聚合,计算每个单词的出现次数。

wordCounts.print(): 将计算结果打印到控制台,每 5 秒钟输出一次当前批次中每个单词的计数结果。

ssc.start(): 启动 Spark Streaming 的计算,这会使得 Spark 开始监听数据源并开始处理数据流。

ssc.awaitTermination(): 阻塞当前线程,等待流计算结束,通常是等待手动停止应用程序。这个方法会让程序保持运行,直到手动终止或遇到异常。

运行结果

【备注:使用 local[],可能会存在问题。】

【如果给虚拟机配置的CPU数为1,使用 local[] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理接受到达的数据。】

【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】

40eeefd69b8a67d206846c8a1166b064_71ea9dc749f14b8b95d34eddab6d7d8b.png

Socket数据流

编写代码

Spark Streaming 可以通过Socket端口监听并接受数据,然后进行相应处理:

打开一个新的命令窗口,启动 nc 程序。(在Flink中也这么用过)

# 如果没有的话 你需要安装一下
nc -lk 9999

编写运行的代码:

package icu.wzk

import org.apache.log4j.{Level, Logger}
import org.apachea.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SocketDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("SocketStream")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1));
    val lines = ssc.socketTextStream("0.0.0.0", 9999)
    val words = lines.flatMap(_.split("\\s+"))
    val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)
    wordCount.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

随后可以在nc窗口中随意输入一些单词,监听窗口会自动获取单词数据流信息,在监听窗口每X秒就会打印出词频的统计信息,可以在屏幕是上出现结果。


运行结果

【备注:使用 local[],可能会存在问题。】

【如果给虚拟机配置的CPU数为1,使用 local[] 也会只启动一个线程,该线程用于 Receiver Task,此时没有资源处理接受到达的数据。】

【现象:程序正常执行,不会打印时间戳,屏幕上也不会有其他有消息信息】

此时,从控制台启动后,输入内容

RDD队列流

基础概念

调用 Spark Streaming应用程序的时候,可使用 streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream

备注:


oneAtTime:缺省为true,一次处理一个RDD,设为False,一次处理全部RDD

RDD队列流可以使用 local[1]

涉及到同时出队和入队操作,所以要做同步

每秒创建一个RDD(RDD存放1-100的整数),Streaming每隔1秒就对数据进行处理,计算RDD中数据除10取余的个数。


队列流优点

适用于测试和开发:RDD 队列流主要用于开发和调试阶段,它允许你在没有真实数据源的情况下测试 Spark Streaming 应用程序。

RDD 队列:你可以创建一个包含 RDD 的队列(Queue),Spark Streaming 会从这个队列中逐一获取 RDD,并将其作为数据流的一部分进行处理。

灵活性:由于是手动创建的 RDD 队列,因此你可以完全控制数据的内容、数量以及生成的速度,从而测试各种场景下的应用表现。

编写代码

编写代码如下:

package icu.wzk

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.Queue

object RDDQueueDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val sparkConf = new SparkConf()
      .setAppName("RDDQueueStream")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    val rddQueue = new Queue[RDD[Int]]()
    val queueStream = ssc.queueStream(rddQueue)
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
    ssc.start()

    for (i <- 1 to 5) {
      rddQueue.synchronized {
        val range = (1 to 100).map(_ * i)
        rddQueue += ssc.sparkContext.makeRDD(range, 2)
      }
      Thread.sleep(2000)
    }

    ssc.stop()
  }
}

运行结果

运行结果如图所示:

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
6月前
|
人工智能 分布式计算 调度
打破资源边界、告别资源浪费:ACK One 多集群Spark和AI作业调度
ACK One多集群Spark作业调度,可以帮助您在不影响集群中正在运行的在线业务的前提下,打破资源边界,根据各集群实际剩余资源来进行调度,最大化您多集群中闲置资源的利用率。
|
10月前
|
存储 分布式计算 并行计算
【赵渝强老师】Spark中的RDD
RDD(弹性分布式数据集)是Spark的核心数据模型,支持分布式并行计算。RDD由分区组成,每个分区由Spark Worker节点处理,具备自动容错、位置感知调度和缓存机制等特性。通过创建RDD,可以指定分区数量,并实现计算函数、依赖关系、分区器和优先位置列表等功能。视频讲解和示例代码进一步详细介绍了RDD的组成和特性。
187 0
|
7月前
|
分布式计算 Spark
【赵渝强老师】Spark RDD的依赖关系和任务阶段
Spark RDD之间的依赖关系分为窄依赖和宽依赖。窄依赖指父RDD的每个分区最多被一个子RDD分区使用,如map、filter操作;宽依赖则指父RDD的每个分区被多个子RDD分区使用,如分组和某些join操作。窄依赖任务可在同一阶段完成,而宽依赖因Shuffle的存在需划分不同阶段执行。借助Spark Web Console可查看任务的DAG图及阶段划分。
297 15
|
7月前
|
存储 缓存 分布式计算
【赵渝强老师】Spark RDD的缓存机制
Spark RDD通过`persist`或`cache`方法可将计算结果缓存,但并非立即生效,而是在触发action时才缓存到内存中供重用。`cache`方法实际调用了`persist(StorageLevel.MEMORY_ONLY)`。RDD缓存可能因内存不足被删除,建议结合检查点机制保证容错。示例中,读取大文件并多次调用`count`,使用缓存后执行效率显著提升,最后一次计算仅耗时98ms。
158 0
【赵渝强老师】Spark RDD的缓存机制
|
8月前
|
存储 分布式计算 调度
Spark Master HA 主从切换过程不会影响到集群已有作业的运行, 为什么?
Spark Master 的高可用性(HA)机制确保主节点故障时,备用主节点能无缝接管集群管理,保障稳定运行。关键在于: 1. **Driver 和 Executor 独立**:任务执行不依赖 Master。 2. **应用状态保持**:备用 Master 通过 ZooKeeper 恢复集群状态。 3. **ZooKeeper 协调**:快速选举新 Master 并同步状态。 4. **容错机制**:任务可在其他 Executor 上重新调度。 这些特性保证了集群在 Master 故障时仍能正常运行。
|
11月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
114 0
|
11月前
|
网络协议 测试技术 网络安全
Python编程-Socket网络编程
Python编程-Socket网络编程
113 0
|
网络协议 开发者 Python
深度探索Python Socket编程:从理论到实践,进阶篇带你领略网络编程的魅力!
【7月更文挑战第25天】在网络编程中, Python Socket编程因灵活性强而广受青睐。本文采用问答形式深入探讨其进阶技巧。**问题一**: Socket编程基于TCP/IP,通过创建Socket对象实现通信,支持客户端和服务器间的数据交换。**问题二**: 提升并发处理能力的方法包括多线程(适用于I/O密集型任务)、多进程(绕过GIL限制)和异步IO(asyncio)。**问题三**: 提供了一个使用asyncio库实现的异步Socket服务器示例,展示如何接收及响应客户端消息。通过这些内容,希望能激发读者对网络编程的兴趣并引导进一步探索。
162 4
|
开发者 Python
Python Socket编程:不只是基础,更有进阶秘籍,让你的网络应用飞起来!
【7月更文挑战第25天】在网络应用蓬勃发展的数字时代,Python凭借其简洁的语法和强大的库支持成为开发高效应用的首选。本文通过实时聊天室案例,介绍了Python Socket编程的基础与进阶技巧,包括服务器与客户端的建立、数据交换等基础篇内容,以及使用多线程和异步IO提升性能的进阶篇。基础示例展示了服务器端监听连接请求、接收转发消息,客户端连接服务器并收发消息的过程。进阶部分讨论了如何利用Python的`threading`模块和`asyncio`库来处理多客户端连接,提高应用的并发处理能力和响应速度。掌握这些技能,能使开发者在网络编程领域更加游刃有余,构建出高性能的应用程序。
112 3
|
网络协议 Python
网络世界的建筑师:Python Socket编程基础与进阶,构建你的网络帝国!
【7月更文挑战第26天】在网络的数字宇宙中,Python Socket编程是开启网络世界大门的钥匙。本指南将引领你从基础到实战,成为网络世界的建筑师。
162 2

热门文章

最新文章