【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)

简介: 【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)

DStream编程

批处理引擎Spark Core把输入的数据按照一定的时间片(如1s)分成一段一段的数据,每一段数据都会转换成RDD输入到Spark Core中,然后将DStream操作转换为RDD算子的相关操作,即转换操作、窗口操作以及输出操作。RDD算子操作产生的中间结果数据会保存在内存中,也可以将中间的结果数据输出到外部存储系统中进行保存。

转换操作

1:无状态转换操作

无状态转化操作每个批次的处理不依赖于之前批次的数据。常见的 RDD 转化操作,例如 Map()、filter()、ReduceByKey() 等,都是无状态转化操作。

2:有状态转化操作:

有状态转化操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。

DStream API提供的与窗口操作相关的方法

DStream API提供的与输出操作相关的方法

编写Spark Streaming程序的基本步骤是:

1)通过创建输入DStream来定义输入源。

2)通过对DStream应用转换操作和输出操作来定义流计算。

3)用streamingContext.start()来开始接收数据和处理流程

4)通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。

5)可以通过streamingContext.stop()来手动结束流计算进程。

通过示例进行演示,此示例为监视一个文件夹的log日志,并计算每个单词出现的次数

cogroup和join算子需要两个并行数据流,对两个数据流直接关联,不同的是join算子是把两个RDD按照相同的key拼在一起,类似SQL中的等值连接,可以类似的使用其他算子进行RDD的左连接等,而cogroup算子是把两个RDD按照key拼起来,但是它会汇总得到的value,最后的结果的条数是根据key决定的,有多少key就汇总成多少条数据,然后把RDD的所有相同的key的value放到一个Iterable里面,类似于SQL里面的全连接

设置为本地运行模式,2个线程,一个监听,另一个处理数据
    val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")
    // 时间间隔为20秒
    val stc = new StreamingContext(sparkConf, Seconds(20))
    //定义输入源,监听本地目录,也可以采用HDFS文件
    val lines = stc.textFileStream("E:/log")
    //应用转换操作flatMap流计算
    val words = lines.flatMap(_.split(" "))
    //应用转换操作Map和ReduceByKey计算
    val wordCounts = words.Map(x => (x, 1)).ReduceByKey(_ + _)
    wordCounts.print()
    //开始接收数据和处理流程
    stc.start()
    //等待处理结束
    stc.awaitTermination()
//创建两个可被并行操作的分布式数据集
    val idName = sc.parallelize(Array((1, "张三"), (2, "李四"), (3, "王五")))
    val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))
    println("\ncogroup\n")
    //对两个并行数据集进行cogroup操作
    idName.cogroup(idAge).collect().foreach(println)
    println("\njoin\n")
    //对两个并行数据集进行join操作
     idName.join(idAge).collect().foreach(println)
 //3.获取StreamingContext对象,5秒一个批次
    val ssc = new StreamingContext(sparkContext,Seconds(5))
    //4.接收socket的数据
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hhaonote",9999)
    //5.获取每一行的单词
    val words: DStream[String] = textStream.flatMap(_.split(" "))
    //6.为每一个单词置为1
    val wordAndOne: DStream[(String, Int)] = words.Map((_,1))
     //7.每隔10秒统计最近10秒的搜索词出现的次数
    val result: DStream[(String, Int)] = wordAndOne.ReduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(10))
    //8.打印
    result.print()

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
9月前
|
存储 SQL 监控
数据中台架构解析:湖仓一体的实战设计
在数据量激增的数字化时代,企业面临数据分散、使用效率低等问题。数据中台作为统一管理与应用数据的核心平台,结合湖仓一体架构,打通数据壁垒,实现高效流转与分析。本文详解湖仓一体的设计与落地实践,助力企业构建统一、灵活的数据底座,驱动业务决策与创新。
|
11月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
11月前
|
存储 SQL 分布式计算
别让你的数据“裸奔”!大数据时代的数据隐私保护实战指南
别让你的数据“裸奔”!大数据时代的数据隐私保护实战指南
631 19
|
10月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
522 0
|
11月前
|
SQL 分布式计算 大数据
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
本文深入介绍 Hive 与大数据融合构建强大数据仓库的实战指南。涵盖 Hive 简介、优势、安装配置、数据处理、性能优化及安全管理等内容,并通过互联网广告和物流行业案例分析,展示其实际应用。具有专业性、可操作性和参考价值。
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
640 79
|
9月前
|
数据采集 分布式计算 大数据
不会Python,还敢说搞大数据?一文带你入门大数据编程的“硬核”真相
不会Python,还敢说搞大数据?一文带你入门大数据编程的“硬核”真相
200 1
|
SQL 缓存 数据处理
数据无界、湖仓无界,Apache Doris 湖仓一体典型场景实战指南(下篇)
Apache Doris 提出“数据无界”和“湖仓无界”理念,提供高效的数据管理方案。本文聚焦三个典型应用场景:湖仓分析加速、多源联邦分析、湖仓数据处理,深入介绍 Apache Doris 的最佳实践,帮助企业快速响应业务需求,提升数据处理和分析效率
835 3
数据无界、湖仓无界,Apache Doris 湖仓一体典型场景实战指南(下篇)
|
存储 分布式计算 大数据
基于阿里云大数据平台的实时数据湖构建与数据分析实战
在大数据时代,数据湖作为集中存储和处理海量数据的架构,成为企业数据管理的核心。阿里云提供包括MaxCompute、DataWorks、E-MapReduce等在内的完整大数据平台,支持从数据采集、存储、处理到分析的全流程。本文通过电商平台案例,展示如何基于阿里云构建实时数据湖,实现数据价值挖掘。平台优势包括全托管服务、高扩展性、丰富的生态集成和强大的数据分析工具。