【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)

简介: 【大数据技术Spark】DStream编程操作讲解实战(图文解释 附源码)

DStream编程

批处理引擎Spark Core把输入的数据按照一定的时间片(如1s)分成一段一段的数据,每一段数据都会转换成RDD输入到Spark Core中,然后将DStream操作转换为RDD算子的相关操作,即转换操作、窗口操作以及输出操作。RDD算子操作产生的中间结果数据会保存在内存中,也可以将中间的结果数据输出到外部存储系统中进行保存。

转换操作

1:无状态转换操作

无状态转化操作每个批次的处理不依赖于之前批次的数据。常见的 RDD 转化操作,例如 Map()、filter()、ReduceByKey() 等,都是无状态转化操作。

2:有状态转化操作:

有状态转化操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。

DStream API提供的与窗口操作相关的方法

DStream API提供的与输出操作相关的方法

编写Spark Streaming程序的基本步骤是:

1)通过创建输入DStream来定义输入源。

2)通过对DStream应用转换操作和输出操作来定义流计算。

3)用streamingContext.start()来开始接收数据和处理流程

4)通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)。

5)可以通过streamingContext.stop()来手动结束流计算进程。

通过示例进行演示,此示例为监视一个文件夹的log日志,并计算每个单词出现的次数

cogroup和join算子需要两个并行数据流,对两个数据流直接关联,不同的是join算子是把两个RDD按照相同的key拼在一起,类似SQL中的等值连接,可以类似的使用其他算子进行RDD的左连接等,而cogroup算子是把两个RDD按照key拼起来,但是它会汇总得到的value,最后的结果的条数是根据key决定的,有多少key就汇总成多少条数据,然后把RDD的所有相同的key的value放到一个Iterable里面,类似于SQL里面的全连接

设置为本地运行模式,2个线程,一个监听,另一个处理数据
    val sparkConf = new SparkConf().setAppName("WordCountStreaming").setMaster("local[2]")
    // 时间间隔为20秒
    val stc = new StreamingContext(sparkConf, Seconds(20))
    //定义输入源,监听本地目录,也可以采用HDFS文件
    val lines = stc.textFileStream("E:/log")
    //应用转换操作flatMap流计算
    val words = lines.flatMap(_.split(" "))
    //应用转换操作Map和ReduceByKey计算
    val wordCounts = words.Map(x => (x, 1)).ReduceByKey(_ + _)
    wordCounts.print()
    //开始接收数据和处理流程
    stc.start()
    //等待处理结束
    stc.awaitTermination()
//创建两个可被并行操作的分布式数据集
    val idName = sc.parallelize(Array((1, "张三"), (2, "李四"), (3, "王五")))
    val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))
    println("\ncogroup\n")
    //对两个并行数据集进行cogroup操作
    idName.cogroup(idAge).collect().foreach(println)
    println("\njoin\n")
    //对两个并行数据集进行join操作
     idName.join(idAge).collect().foreach(println)
 //3.获取StreamingContext对象,5秒一个批次
    val ssc = new StreamingContext(sparkContext,Seconds(5))
    //4.接收socket的数据
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("hhaonote",9999)
    //5.获取每一行的单词
    val words: DStream[String] = textStream.flatMap(_.split(" "))
    //6.为每一个单词置为1
    val wordAndOne: DStream[(String, Int)] = words.Map((_,1))
     //7.每隔10秒统计最近10秒的搜索词出现的次数
    val result: DStream[(String, Int)] = wordAndOne.ReduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(10))
    //8.打印
    result.print()

创作不易 觉得有帮助请点赞关注收藏~~~

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
6月前
|
存储 人工智能 大数据
云栖2025|阿里云开源大数据发布新一代“湖流一体”数智平台及全栈技术升级
阿里云在云栖大会发布“湖流一体”数智平台,推出DLF-3.0全模态湖仓、实时计算Flink版升级及EMR系列新品,融合实时化、多模态、智能化技术,打造AI时代高效开放的数据底座,赋能企业数字化转型。
1245 0
|
8月前
|
数据采集 人工智能 分布式计算
ODPS在AI时代的发展战略与技术演进分析报告
ODPS(现MaxCompute)历经十五年发展,从分布式计算平台演进为AI时代的数据基础设施,以超大规模处理、多模态融合与Data+AI协同为核心竞争力,支撑大模型训练与实时分析等前沿场景,助力企业实现数据驱动与智能化转型。
553 4
|
6月前
|
数据可视化 大数据 关系型数据库
基于python大数据技术的医疗数据分析与研究
在数字化时代,医疗数据呈爆炸式增长,涵盖患者信息、检查指标、生活方式等。大数据技术助力疾病预测、资源优化与智慧医疗发展,结合Python、MySQL与B/S架构,推动医疗系统高效实现。
|
8月前
|
SQL 分布式计算 大数据
我与ODPS的十年技术共生之路
ODPS十年相伴,从初识的分布式计算到共生进化,突破架构边界,推动数据价值深挖。其湖仓一体、隐私计算与Serverless能力,助力企业降本增效,赋能政务与商业场景,成为数字化转型的“数字神经系统”。
|
8月前
|
存储 人工智能 算法
Java 大视界 -- Java 大数据在智能医疗影像数据压缩与传输优化中的技术应用(227)
本文探讨 Java 大数据在智能医疗影像压缩与传输中的关键技术应用,分析其如何解决医疗影像数据存储、传输与压缩三大难题,并结合实际案例展示技术落地效果。
|
8月前
|
机器学习/深度学习 算法 Java
Java 大视界 -- Java 大数据在智能物流运输车辆智能调度与路径优化中的技术实现(218)
本文深入探讨了Java大数据技术在智能物流运输中车辆调度与路径优化的应用。通过遗传算法实现车辆资源的智能调度,结合实时路况数据和强化学习算法进行动态路径优化,有效提升了物流效率与客户满意度。以京东物流和顺丰速运的实际案例为支撑,展示了Java大数据在解决行业痛点问题中的强大能力,为物流行业的智能化转型提供了切实可行的技术方案。
|
9月前
|
数据采集 自然语言处理 分布式计算
大数据岗位技能需求挖掘:Python爬虫与NLP技术结合
大数据岗位技能需求挖掘:Python爬虫与NLP技术结合
|
7月前
|
机器学习/深度学习 传感器 分布式计算
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
数据才是真救命的:聊聊如何用大数据提升灾难预警的精准度
502 14
|
9月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
334 4
|
8月前
|
机器学习/深度学习 运维 监控
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
运维不怕事多,就怕没数据——用大数据喂饱你的运维策略
672 0