SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)

简介: SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;

但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;

最后想说一句君子不隐其短,不知则问,不能则学。

如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)

一、SparkStreaming概述

SparkStreaming是什么

SparkStreaming用于流式数据的处理。

(1)SparkS支持的数据输入源很多,例如kafka、Flume、HDFS等。

(2)数据输入可以用Spark的高度抽象原语如:map、Reduce、join、Window等进行运算

(3)而且结果也能保存在很多地方,例如HDFS、数据库等。

image.png

image.png

采集数据应该从右往左,因为右边的数据先到

image.png

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。所以简单来将,DStream就是对RDD在实时数据处理场景的一种封装。

Spark Streaming架构

image.png

SparkStreaming架构图

image.png

背压机制

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。

通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

SparkStreaming特点

1、易用

2、容错

3、易整合到Spark体系

二、DStream入门

2.1 WordCount案例实操

1、需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数。

image.png

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.5.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.5.0</version>
    </dependency>
</dependencies>

java版本

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * @ClassName Test01_wordCount
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/3 20:49
 * @Version 1.0
 */
public class Test01_wordCount {
    public static void main(String[] args) throws InterruptedException {
        //TODO 第一步 创建SparkConf对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_wordCount");

        //TODO 第二步 创建JavaStreamingContext对象,并设置批次时间
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3L));
        //TODO 第三步 地读取数据开始业务逻辑计算
        //1、对接数据源获取数据
        JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44444);

        //2、切分
        JavaDStream<String> flatMapDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        //3、转换word->(word,1)
        JavaPairDStream<String, Integer> javaPairDStream = flatMapDStream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        //4、统计单词个数
        JavaPairDStream<String, Integer> reduceByKeyDStream = javaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //5、输出结果
        reduceByKeyDStream.print();

        //TODO 第四步 启动阻塞进程
        ssc.start();
        ssc.awaitTermination();
    }
}

scala版本

package streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
object SparkStreaming01_WordCount {
  def main(args: Array[String]): Unit = {
    //SparkCore : SparkContext
    //SparkSQL  : SparkSession
    //SparkStreaming: StreamingContext
    //表示环境配置
    val sparkConf: SparkConf = {
    new SparkConf().setAppName("SparkStreaming").setMaster("local[*]")

    }
    //表示批量处理的周期(采集周期)
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 从端口读取数据
    // 将从端口中读取到的一行数据处理成一个String.
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("node2", 9999)

    val wordsDS: DStream[String] = socketDS.flatMap(_.split(" "))

    val wordToOneDS: DStream[(String, Int)] = wordsDS.map((_, 1))

    val resutDS: DStream[(String, Int)] = wordToOneDS.reduceByKey(_ + _)

    resutDS.print()

    //启动采集器(接收器)
    ssc.start()
    //等待采集器的结束
    ssc.awaitTermination()
     //由于SparkStreaming采集器是长期执行的任务,所以不能关闭
      //如果main方法执行完毕,应用程序会自动结束,所以不能让main执行完毕
    //ssc.stop()

  }

}

启动程序并通过netcat发送数据(nc 再启动IEDA程序):

image.png

WordCount解析

在SparkStreaming中,DataStream是基础抽象,代表这数据流和经过算子计算的结果流。SparkStreaming仍然是基于批处理的思想来处理流式数据的,在内部实现上,将每一批次的数据疯转为一个RDD,DStream就是一系列RDD的封装,接下来就是Spark引擎来对这些RDD进行转换。DStream中批次与批次之间计算相互独立。

image.png

DStream创建

RDD队列

用法及说明

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例实操

需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount

package streaming
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
object SpariStreaming02_DStream_queue {
  def main(args: Array[String]): Unit = {

    //SparkCore : SparkContext
    //SparkSQL  : SparkSession
    //SparkStreaming: StreamingContext
  //ssc表示批量处理的周期,采集周期
    val sparkConf: SparkConf =
    new SparkConf().setAppName("SparkStreaming").setMaster("local[*]")

    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val rddQueue= new mutable.Queue[RDD[Int]]()
    val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)
    val mappedStream: DStream[(Int, Int)] = inputStream.map((_, 1))
    val reducedStream: DStream[(Int, Int)] = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
    //启动采集器(接收器)
    ssc.start()
    for (elem <- 1 to 5) {
      rddQueue+= ssc.sparkContext.makeRDD(1 to 300,10)
      Thread.sleep(2000)
    }
    //等待采集器的结束
    ssc.awaitTermination()

    //ssc.stop()
  }





}

image.png

相关文章
|
2月前
|
消息中间件 Java Kafka
kafka入门demo
kafka入门demo
47 0
|
2月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
98 1
|
1月前
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如果设置了从Kafka数据源同步到MaxCompute(mc)的任务,任务一直在执行中,是什么原因
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
30 10
|
2月前
|
消息中间件 存储 Kafka
Kafka【基础入门】
Kafka【基础入门】
42 1
|
2月前
|
消息中间件 Java Kafka
MQ产品使用合集之对于Kafka作为数据源的情况,官方比较推荐哪种使用方式
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 分布式计算
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
54 0
|
29天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
29天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
28天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
923 0
|
19天前
|
Java
使用kafka-clients操作数据(java)
使用kafka-clients操作数据(java)
19 6