大数据Spark Streaming入门

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据Spark Streaming入门

1 官方案例运行

SparkStreaming官方提供Example案例,功能描述:从TCP Socket数据源实时消费数据,对每批次Batch数据进行词频统计WordCount,流程图如下:

1、数据源:TCP Socket从哪里读取实时数据,然后进行实时分析
2、数据终端:输出控制台结果数据输出到哪里
3、功能:对每批次数据实时统计,时间间隔BatchInterval:1s

文档: http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#a-quick-example

运行官方提供案例,使用【$SPARK_HOME/bin/run-example】命令运行,效果如下:

具体步骤如下:

  • 第一步、准备数据源启动端口,准备数据
nc -lk 9999
spark spark hive hadoop spark hive
  • 第二步、运行官方案例
  1. 使用官方提供命令行运行案例
# 官方入门案例运行:词频统计
/export/server/spark/bin/run-example --master local[2] streaming.NetworkWordCount node1.oldlu.cn 9999
  • 第三步、运行结果

这个时间为时间戳

SparkStreaming模块对流式数据处理,介于Batch批处理和RealTime实时处理之间处理数据方式。

2 编程实现

基于IDEA集成开发环境,编程实现:从TCP Socket实时读取流式数据,对每批次中数据进行

词频统计WordCount。

2.1 StreamingContext

回顾SparkCore和SparkSQL及SparkStreaming处理数据时编程:


1)、SparkCore

数据结构:RDD

SparkContext:上下文实例对象

2)、SparkSQL

数据结构:Dataset/DataFrame = RDD + Schema

SparkSession:会话实例对象, 在Spark 1.x中SQLContext/HiveContext

3)、SparkStreaming

数据结构:DStream = Seq[RDD]

StreamingContext:流式上下文实例对象,底层还是SparkContext

参数:划分流式数据时间间隔BatchInterval:1s,5s(演示)

文档: http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#initializing-streamingcontext

从官方文档可知,提供两种方式构建StreamingContext实例对象,截图如下:

第一种方式:构建SparkConf对象

  • 第二种方式:构建SparkContext对象

2.2 编写代码

针对SparkStreaming流式应用来说,代码逻辑大致如下五个步骤:

1、Define the input sources by creating input DStreams.定义从哪个数据源接收流式数据,封装到DStream中
2、Define the streaming computations by applying transformation and output operations to DStreams.针对业务调用DStream中函数,进行数据处理和输出
3、Start receiving data and processing it using streamingContext.start().
4 、 Wait for the processing to be stopped (manually or due to any error) usingstreamingContext.awaitTermination().
5、The processing can be manually stopped using streamingContext.stop().启动流式应用,并且一直等待程序终止(人为或异常),最后停止运行

完整StreamingWordCount代码如下所示:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * 基于IDEA集成开发环境,编程实现从TCP Socket实时读取流式数据,对每批次中数据进行词频统计。
 */
object StreamingWordCount {
  def main(args: Array[String]): Unit = {
    // TODO: 1. 构建StreamingContext流式上下文实例对象
    val ssc: StreamingContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[3]")
      // b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
      val context = new StreamingContext(sparkConf, Seconds(5))
      // c. 返回
      context
    }
    // TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
    /*
    def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String]
    */
    val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
      "node1.oldlu.cn", 9999
    )
    // TODO: 3. 对每批次的数据进行词频统计
    val resultDStream: DStream[(String, Int)] = inputDStream
      // 过滤不合格的数据
      .filter(line => null != line && line.trim.length > 0)
      // 按照分隔符划分单词
      .flatMap(line => line.trim.split("\\s+"))
      // 转换数据为二元组,表示每个单词出现一次
      .map(word => (word, 1))
      // 按照单词分组,聚合统计
      .reduceByKey((tmp, item) => tmp + item)
    // TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
    resultDStream.print(10)
    // TODO: 5. 对于流式应用来说,需要启动应用
    ssc.start()
    // 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
    ssc.awaitTermination()
    // 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

运行结果监控截图:

2.3 Streaming 应用监控

运行上述词频统计案例,登录到WEB UI监控页面:http://localhost:4040,查看相关监控信息。

  • 其一、Streaming流式应用概要信息
  • 每批次Batch数据处理总时间TD = 批次调度延迟时间SD + 批次数据处理时间PT。
  • 其二、性能衡量标准
    SparkStreaming实时处理数据性能如何(是否可以实时处理数据)??如何衡量的呢??
每批次数据处理时间TD <= BatchInterval每批次时间间隔

3 Streaming 工作原理

SparkStreaming处理流式数据时,按照时间间隔划分数据为微批次(Micro-Batch),每批次数据当做RDD,再进行处理分析。

以上述词频统计WordCount程序为例,讲解Streaming工作原理。

3.1 创建 StreamingContext

当SparkStreaming流式应用启动(streamingContext.start)时,首先创建StreamingContext流式上下文实例对象,整个流式应用环境构建,底层还是SparkContext。

当StreamingContext对象构建以后,启动接收器Receiver,专门从数据源端接收数据,此接收器作为Task任务运行在Executor中,一直运行(Long Runing),一直接收数据。

从WEB UI界面【Jobs Tab】可以看到【Job-0】是一个Receiver接收器,一直在运行,以Task方式运行,需要1Core CPU。

可以从多个数据源端实时消费数据进行处理,例如从多个TCP Socket接收数据,对每批次数据

进行词频统计,使用DStream#union函数合并接收数据流,演示代码如下:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * 从TCP Socket 中读取数据,对每批次(时间为5秒)数据进行词频统计,将统计结果输出到控制台。
 * TODO: 从多个Socket读取流式数据,进行union合并
 */
object StreamingDStreamUnion {
  def main(args: Array[String]): Unit = {
    // TODO: 1. 构建StreamingContext流式上下文实例对象
    val ssc: StreamingContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[4]")
      // b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
      val context = new StreamingContext(sparkConf, Seconds(5))
      // c. 返回
      context
    }
    // TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
    /*
    def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String]
    */
    val inputDStream01: DStream[String] = ssc.socketTextStream("node1.oldlu.cn", 9999)
    val inputDStream02: DStream[String] = ssc.socketTextStream("node1.oldlu.cn", 9988)
    // 合并两个DStream流
    val inputDStream: DStream[String] = inputDStream01.union(inputDStream02)
    // TODO: 3. 对每批次的数据进行词频统计
    val resultDStream: DStream[(String, Int)] = inputDStream
      // 过滤不合格的数据
      .filter(line => null != line && line.trim.length > 0)
      // 按照分隔符划分单词
      .flatMap(line => line.trim.split("\\s+"))
      // 转换数据为二元组,表示每个单词出现一次
      .map(word => (word, 1))
      // 按照单词分组,聚合统计
      .reduceByKey((tmp, item) => tmp + item)
    // TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
    resultDStream.print(10)
    // TODO: 5. 对于流式应用来说,需要启动应用
    ssc.start()
    // 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
    ssc.awaitTermination()
    // 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

3.2 接收器接收数据

启动每个接收器Receiver以后,实时从数据源端接收数据(比如TCP Socket),也是按照时间间隔将接收的流式数据划分为很多Block(块)。

接收器Receiver划分流式数据的时间间隔BlockInterval,默认值为200ms,通过属性【spark.streaming.blockInterval】设置。接收器将接收的数据划分为Block以后,按照设置的存储级别对Block进行存储,从TCP Socket中接收数据默认的存储级别为:MEMORY_AND_DISK_SER_2,先存储内存,不足再存储磁盘,存储2副本。从TCP Socket消费数据时可以设置Block存储级别,演示代码如下:

// TODO: 2. 从数据源端读取数据,此处是TCP Socket读取数据
/*
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String]
*/
val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(
  "node1.oldlu.cn", //
  9999, //
  // TODO: 设置Block存储级别为先内存,不足磁盘,副本为1
  storageLevel = StorageLevel.MEMORY_AND_DISK
)

3.3 汇报接收Block报告

接收器Receiver将实时汇报接收的数据对应的Block信息,当BatchInterval时间达到以后,

StreamingContext将对应时间范围内数据block当做RDD,加载SparkContextt处理数据。

以此循环处理流式的数据,如下图所示:

3.4 Streaming 工作原理总述

整个Streaming运行过程中,涉及到两个时间间隔:

  • 批次时间间隔:BatchInterval
  1. 每批次数据的时间间隔,每隔多久加载一个Job;
  • Block时间间隔:BlockInterval
  1. 接收器划分流式数据的时间间隔,可以调整大小哦,官方建议最小值不能小于50ms;

默认值为200ms,属性:spark.streaming.blockInterval,调整设置

官方案例:
BatchInterval: 1s = 1000ms = 5 * BlockInterval
每批次RDD数据中,有5个Block,每个Block就是RDD一个分区数据

从代码层面结合实际数据处理层面来看,Streaming处理原理如下,左边为代码逻辑,右边为

实际每批次数据处理过程。

具体运行数据时,每批次数据依据代码逻辑执行。

// TODO: 3. 对每批次的数据进行词频统计
val resultDStream: DStream[(String, Int)] = inputDStream
  // 过滤不合格的数据
  .filter(line => null != line && line.trim.length > 0)
  // 按照分隔符划分单词
  .flatMap(line => line.trim.split("\\s+"))
  // 转换数据为二元组,表示每个单词出现一次
  .map(word => (word, 1))
  // 按照单词分组,聚合统计
  .reduceByKey((tmp, item) => tmp + item)
// TODO: 4. 将结果数据输出 -> 将每批次的数据处理以后输出
resultDStream.print(10)

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
打赏
0
1
1
0
110
分享
相关文章
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
136 79
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
270 2
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
346 2
ClickHouse与大数据生态集成:Spark & Flink 实战
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
240 1
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
【YashanDB知识库】使用DataX工具迁移yashan数据到maxcompute
本文介绍使用崖山适配的DataX工具进行数据库迁移的方法,包括单表迁移和批量表迁移。单表迁移需配置json文件并执行同步命令;批量迁移则通过脚本自动化生成json配置文件并完成数据迁移,最后提供数据比对功能验证迁移结果。具体步骤涵盖连接信息配置、表清单获取、json文件生成、数据迁移执行及日志记录,确保数据一致性。相关工具和脚本简化了复杂迁移过程,提升效率。
数据让农业更聪明——用大数据激活田间地头
数据让农业更聪明——用大数据激活田间地头
38 2
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
川航选择引入 SelectDB 建设湖仓一体大数据分析引擎,取得了数据导入效率提升 3-6 倍,查询分析性能提升 10-18 倍、实时性提升至 5 秒内等收益。
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
数据的“潘多拉魔盒”:大数据伦理的深度思考
数据的“潘多拉魔盒”:大数据伦理的深度思考
96 25

热门文章

最新文章