Spark修炼之道(进阶篇)——Spark入门到精通:第十一节 Spark Streaming—— DStream Transformation操作

简介: 本节主要内容本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.htmlDStream Transformation操作1. Transformation操作Transformation Meaningmap(func) 对DStrea

本节主要内容

本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html

  1. DStream Transformation操作

1. Transformation操作

Transformation Meaning
map(func) 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream.
flatMap(func) 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
filter(func) 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
repartition(numPartitions) 增加或减少DStream中的分区数,从而改变DStream的并行度
union(otherStream) 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count() 通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce(func) 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue() 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
transform(func) 通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
updateStateByKey(func) 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream

具体示例:

    //读取本地文件~/streaming文件夹
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordMap = words.map(x => (x, 1))
    val wordCounts=wordMap.reduceByKey(_ + _)
    val filteredWordCounts=wordCounts.filter(_._2>1)
    val numOfCount=filteredWordCounts.count()
    val countByValue=words.countByValue()
    val union=words.union(word1)
    val transform=words.transform(x=>x.map(x=>(x,1)))
    //显式原文件
    lines.print()
    //打印flatMap结果
    words.print()
    //打印map结果
    wordMap.print()
    //打印reduceByKey结果
    wordCounts.print()
    //打印filter结果
    filteredWordCounts.print()
    //打印count结果
    numOfCount.print()
    //打印countByValue结果
    countByValue.print()
    //打印union结果
    union.print()
    //打印transform结果
    transform.print()

下面的代码是运行时添加的文件内容

root@sparkmaster:~/streaming# echo "A B C D" >> test12.txt; echo "A B" >> test12.txt

下面是前面各个函数的结果

-------------------------------------------
lines.print()
-------------------------------------------
A B C D
A B

-------------------------------------------
flatMap结果
-------------------------------------------
A
B
C
D
A
B

-------------------------------------------
map结果
-------------------------------------------
(A,1)
(B,1)
(C,1)
(D,1)
(A,1)
(B,1)

-------------------------------------------
reduceByKey结果
-------------------------------------------
(B,2)
(D,1)
(A,2)
(C,1)


-------------------------------------------
filter结果
-------------------------------------------
(B,2)
(A,2)

-------------------------------------------
count结果
-------------------------------------------
2

-------------------------------------------
countByValue结果
-------------------------------------------
(B,2)
(D,1)
(A,2)
(C,1)

-------------------------------------------
union结果
-------------------------------------------
A
B
C
D
A
B
A
B
C
D
...

-------------------------------------------
transform结果
-------------------------------------------
(A,1)
(B,1)
(C,1)
(D,1)
(A,1)
(B,1)

示例2:
上节课中演示的WordCount代码并没有只是对输入的单词进行分开计数,没有记录前一次计数的状态,如果想要连续地进行计数,则可以使用updateStateByKey方法来进行。下面的代码主要给大家演示如何updateStateByKey的方法,

import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._

object StatefulNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

   //函数字面量,输入的当前值与前一次的状态结果进行累加
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)
    }

     //输入类型为K,V,S,返回值类型为K,S
     //V对应为带求和的值,S为前一次的状态
    val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
    }

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[4]")

    //每一秒处理一次
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    //当前目录为checkpoint结果目录,后面会讲checkpoint在Spark Streaming中的应用
    ssc.checkpoint(".")

    //RDD的初始化结果
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))


    //使用Socket作为输入源,本例ip为localhost,端口为9999
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    //flatMap操作
    val words = lines.flatMap(_.split(" "))
    //map操作
    val wordDstream = words.map(x => (x, 1))

    //updateStateByKey函数使用
    val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
      new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

下图是初始时的值:
这里写图片描述
使用下列命令启动netcat server

root@sparkmaster:~/streaming# nc -lk 9999

然后输入

root@sparkmaster:~/streaming# nc -lk 9999
hello

将得到下图的结果

这里写图片描述

然后再输入world,

root@sparkmaster:~/streaming# nc -lk 9999
hello
world

则将得到下列结果
这里写图片描述

目录
相关文章
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
56 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
115 0
|
2月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
73 0
|
3月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
64 0
|
3月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
64 0
|
分布式计算 Spark
spark transformation与action操作函数
一、Transformation map(func) 返回一个新的分布式数据集,由每个原元素经过函数处理后的新元素组成 filter(func) 返回一个新的数据集,经过fun函数处理后返回值为true的原元素组成 flatMap(func) 类似于map,但每个输入元素会被映射为0个或多个输...
1029 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
192 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
82 0
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
122 6