这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;
但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;
最后想说一句君子不隐其短,不知则问,不能则学。
如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)
一、SparkStreaming概述
SparkStreaming是什么
SparkStreaming用于流式数据的处理。
(1)SparkS支持的数据输入源很多,例如kafka、Flume、HDFS等。
(2)数据输入可以用Spark的高度抽象原语如:map、Reduce、join、Window等进行运算
(3)而且结果也能保存在很多地方,例如HDFS、数据库等。
采集数据应该从右往左,因为右边的数据先到
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。所以简单来将,DStream就是对RDD在实时数据处理场景的一种封装。
Spark Streaming架构
SparkStreaming架构图
背压机制
Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
SparkStreaming特点
1、易用
2、容错
3、易整合到Spark体系
二、DStream入门
2.1 WordCount案例实操
1、需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.5.0</version> </dependency> </dependencies>
java版本
import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * @ClassName Test01_wordCount * @Description TODO * @Author Zouhuiming * @Date 2023/7/3 20:49 * @Version 1.0 */ public class Test01_wordCount { public static void main(String[] args) throws InterruptedException { //TODO 第一步 创建SparkConf对象 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_wordCount"); //TODO 第二步 创建JavaStreamingContext对象,并设置批次时间 JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3L)); //TODO 第三步 地读取数据开始业务逻辑计算 //1、对接数据源获取数据 JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44444); //2、切分 JavaDStream<String> flatMapDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); //3、转换word->(word,1) JavaPairDStream<String, Integer> javaPairDStream = flatMapDStream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }); //4、统计单词个数 JavaPairDStream<String, Integer> reduceByKeyDStream = javaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //5、输出结果 reduceByKeyDStream.print(); //TODO 第四步 启动阻塞进程 ssc.start(); ssc.awaitTermination(); } }
scala版本
package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Duration, Seconds, StreamingContext} object SparkStreaming01_WordCount { def main(args: Array[String]): Unit = { //SparkCore : SparkContext //SparkSQL : SparkSession //SparkStreaming: StreamingContext //表示环境配置 val sparkConf: SparkConf = { new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") } //表示批量处理的周期(采集周期) val ssc = new StreamingContext(sparkConf, Seconds(3)) // 从端口读取数据 // 将从端口中读取到的一行数据处理成一个String. val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("node2", 9999) val wordsDS: DStream[String] = socketDS.flatMap(_.split(" ")) val wordToOneDS: DStream[(String, Int)] = wordsDS.map((_, 1)) val resutDS: DStream[(String, Int)] = wordToOneDS.reduceByKey(_ + _) resutDS.print() //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() //由于SparkStreaming采集器是长期执行的任务,所以不能关闭 //如果main方法执行完毕,应用程序会自动结束,所以不能让main执行完毕 //ssc.stop() } }
启动程序并通过netcat发送数据(nc 再启动IEDA程序):
WordCount解析
在SparkStreaming中,DataStream是基础抽象,代表这数据流和经过算子计算的结果流。SparkStreaming仍然是基于批处理的思想来处理流式数据的,在内部实现上,将每一批次的数据疯转为一个RDD,DStream就是一系列RDD的封装,接下来就是Spark引擎来对这些RDD进行转换。DStream中批次与批次之间计算相互独立。
DStream创建
RDD队列
用法及说明
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。
案例实操
需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount
package streaming import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object SpariStreaming02_DStream_queue { def main(args: Array[String]): Unit = { //SparkCore : SparkContext //SparkSQL : SparkSession //SparkStreaming: StreamingContext //ssc表示批量处理的周期,采集周期 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val rddQueue= new mutable.Queue[RDD[Int]]() val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false) val mappedStream: DStream[(Int, Int)] = inputStream.map((_, 1)) val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _) reducedStream.print() //启动采集器(接收器) ssc.start() for (elem <- 1 to 5) { rddQueue+= ssc.sparkContext.makeRDD(1 to 300,10) Thread.sleep(2000) } //等待采集器的结束 ssc.awaitTermination() //ssc.stop() } }