喜大普奔!破百了!
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
Spark Streaming 基础数据源
文件流、Socket流、RDD队列流
引入依赖、Java编写多种流进行测试
DStream 转换
DStream上的操作与RDD类似,分为Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的方法,如:
- updateStateByKey
- transform
- window相关操作
- map(func)
对 DStream 中的每个元素应用 func 函数,并返回一个新的 DStream。
例如,将每个记录转换为其长度。
示例:val lengths = lines.map(line => line.length)
flatMap(func)
对 DStream 中的每个元素应用 func 函数,并将结果展平(即将集合的集合展开)。
例如,将每一行文本拆分为单词。
示例:val words = lines.flatMap(line => line.split(" "))
filter(func)
对 DStream 中的每个元素应用 func 函数,并保留返回值为 true 的元素。
例如,过滤掉长度小于 5 的单词。
示例:val filteredWords = words.filter(word => word.length > 5)
reduceByKey(func)
对键值对 DStream 进行聚合操作,对具有相同键的元素应用 func 函数。
例如,计算每个单词的总数。
示例:val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
groupByKey()
对键值对 DStream 中的每个键进行分组,并将具有相同键的值聚合到一个列表中。
示例:val grouped = pairs.groupByKey()
count()
统计 DStream 中每个 RDD 的元素个数。
示例:val count = words.count()
countByValue()
统计 DStream 中每个 RDD 中每个值的出现次数。
示例:val valueCounts = words.countByValue()
union(otherDStream)
将两个 DStream 合并为一个新的 DStream,包含两个 DStream 中的所有元素。
示例:val mergedStream = stream1.union(stream2)
join(otherDStream)
对两个键值对 DStream 进行连接操作,类似 SQL 中的 JOIN 操作。
示例:val joinedStream = stream1.join(stream2)
备注:
在DStream与RDD上的转换操作非常类似(无状态操作)
DStream有自己特殊的操作(窗口操作、追踪状态变化操作)
在DStream上的转换操作比RDD上的转换操作少
DStream 的转换操作可以分为 无状态(stateless)和 有状态(stateful)两种:
无状态转换操作,每个批次的处理不依赖与之前批次的数据,常见的RDD转化操作,例如:map、Filter、reduceByKey等
有状态转换操作,需要使用之前批次的数据或者是中间结果来计算当前批次的数据,有状态转换操作包括:基于滑动窗口的转换操作或追踪状态变化的转化操作
无状态转换
无状态转换操作就是把简单的RDD转换操作应用到每个批次上,也就是转换DStream中的每一个RDD。
常见的无状态转换包括:
map
flatMap
repartition
reduceByKey
groupByKey
重要的转换操作:transform,通过对源DStream的每个RDD应用RDD-To-RDD函数,创建一个新的DStream,支持在新的DStream中任何RDD操作。
这是一个功能强大的函数,它可以允许开发者直接操作其内部的RDD,也就是说开发者,可以任意提供一个RDDToRDD的函数,这个函数在数据流每个批次中都被调用,生成一个新的流。
案例1 黑名单过滤
假设:arr1为黑名单数据(自定义),true表示数据生效,需要被过滤掉;false表示数据 未生效 val arr1 = Array(("spark", true), ("scala", false)) 假设:流式数据格式为"time word",需要根据黑名单中的数据对流式数据执行过滤操 作。如"2 spark"要被过滤掉 1 hadoop 2 spark 3 scala 4 java 5 hive 结果:"2 spark" 被过滤