Structured Streaming架构原理详解!

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 笔记

一、Structured Streaming概述


Structured Streaming是一个基于sparksql引擎开发的可伸展和容错的流处理引擎。Structured Streaming传输中的关键思想是将实时数据流视为被连续添加的表。

这导致了一个新的流处理模型,该模型与批处理模型非常相似。您将像在静态表上一样将流计算表示为类似于批处理的标准查询,Spark在无界输入表上将其作为增量查询运行。


程式设计模型

将输入数据流视为“输入表”。流上到达的每个数据项都像是将新行附加到输入表中。1.png

对输入的查询将生成“结果表”。在每个触发间隔(例如,每1秒钟),新行将附加到输入表中,并最终更新结果表。无论何时更新结果表,我们都希望将更改后的结果行写入外部接收器。

2.png

“输出”定义为写到外部存储器的内容。可以在不同的模式下定义输出:


Complete Mode:整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。

Append Mode:仅将自上次触发以来追加在结果表中的新行写入外部存储器。这仅适用于预期结果表中现有行不会更改的查询。

Update Mode:仅自上次触发以来在结果表中已更新的行将被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,则等同于追加模式。3.png


二、Structured Streaming与Socket集成


以complete输出为例

[caizhengjie@node1 kafka]$ nc -lk 9999
java java java 
python java 
java java python java hive hove
hbase
hbase hbase hive python java
package com.spark.test
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/29
 * @time : 3:36 下午
 */
object StructuredStreamingTest {
    def main(args: Array[String]): Unit = {
        // 创建Spark Session对象
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        // 创建一个流数据框架
        val lines = spark.readStream
                .format("socket")
                .option("host","node1")
                .option("port",9999)
                .load() // 返回的是dataframe格式
        import spark.implicits._
        // 先将dataframe准换成dataset,在对数据进行处理
        val words = lines.as[String].flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
        // 输出
        val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

运行结果

4.png


三、Structured Streaming与Kafka集成

Kafka 0.10的结构化流集成,可从Kafka读取数据或向Kafka写入数据。


(1)通过IDEA工具

对于使用Maven项目定义的Scala 应用程序,需要加载pom.xml配置文件

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
      <version>${saprk.version}</version>
</dependency>

启动kafka的生产者

bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test
>java java python python
>hive hive java java 

运行程序

package com.spark.test
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/29
 * @time : 4:49 下午
 */
object StructuredStreamingKafka {
    def main(args: Array[String]): Unit = {
        // 创建Spark Session对象
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        // 读取kafka stream流
        val df = spark
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", "node1:9092")
                .option("subscribe", "test")
                .load() //返回的是dataframe格式
        import spark.implicits._
        // 先将dataframe准换成dataset,在对数据进行处理
        val lines = df.selectExpr("CAST(value AS STRING)").as[String]
        val words =  lines.flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
        // 输出
        val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

运行结果:

5.png

(2)通过spark-shell

在通过spark-shell运行时,需要将下面的jar包拷贝到spark的jar目录下

6.png

kafka_2.11-2.1.1.jar和kafka-clients-2.1.1.jar在kafka的lib中能找到

spark-sql-kafka-0-10_2.11-2.4.6.jar和spark-streaming-kafka-0-10_2.11-2.4.6.jar需要到maven的仓库下找

7.png

image.png

启动kafka的生产者

bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test

运行spark-shell

bin/spark-shell --master local[2]
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "test")
.load()
import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val words =  lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
// 输出
val query = wordCounts.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
// Exiting paste mode, now interpreting.

查看运行结果:

9.png


四、Structured Streaming与MySQL集成


关于Structured Streaming与MySQL集成可以见文档:

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html


通常,我们希望能够将流的输出写入外部数据库(例如MySQL)。在撰写本文时,结构化流API不支持将外部数据库作为接收器。但是,这样做的话,API选项将像一样简单.format(“jdbc”).start(“jdbc:mysql/…”)。同时,我们可以使用foreach接收器来完成此任务。让我们创建一个自定义JDBC Sink,它扩展了ForeachWriter并实现了其方法。10.png

我们现在就可以使用我们的JDBCSink了:

11.png

As batches are complete, counts by zip could be INSERTed/UPSERTed into MySQL as needed.

12.png


相关文章
|
4天前
|
容器
Flutter&鸿蒙next 布局架构原理详解
Flutter&鸿蒙next 布局架构原理详解
|
15天前
|
前端开发 Java 应用服务中间件
21张图解析Tomcat运行原理与架构全貌
【10月更文挑战第2天】本文通过21张图详细解析了Tomcat的运行原理与架构。Tomcat作为Java Web开发中最流行的Web服务器之一,其架构设计精妙。文章首先介绍了Tomcat的基本组件:Connector(连接器)负责网络通信,Container(容器)处理业务逻辑。连接器内部包括EndPoint、Processor和Adapter等组件,分别处理通信、协议解析和请求封装。容器采用多级结构(Engine、Host、Context、Wrapper),并通过Mapper组件进行请求路由。文章还探讨了Tomcat的生命周期管理、启动与停止机制,并通过源码分析展示了请求处理流程。
|
13天前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
32 3
|
13天前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
17 2
|
13天前
|
消息中间件 监控 Java
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
大数据-109 Flink 体系结构 运行架构 ResourceManager JobManager 组件关系与原理剖析
31 1
|
13天前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
27 0
|
16天前
|
存储 网络协议 Unix
docker的底层原理一:客户端-服务器架构
本文详细解释了Docker的客户端-服务器架构,包括常驻后台的Docker守护进程、通过命令行接口发送请求的Docker客户端、以及它们之间通过Unix socket或网络接口进行的通信。
12 0
|
1月前
|
存储 SQL Cloud Native
Hologres 的架构设计与工作原理
【9月更文第1天】随着大数据时代的到来,实时分析和处理数据的需求日益增长。传统的数据仓库在处理大规模实时数据分析时逐渐显露出性能瓶颈。为了解决这些问题,阿里巴巴集团研发了一款名为 Hologres 的新型云原生交互式分析数据库。Hologres 能够支持 SQL 查询,并且能够实现实时的数据写入和查询,这使得它成为处理大规模实时数据的理想选择。
78 2
|
1月前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
2月前
|
存储 分布式计算 Hadoop
ChunkServer 原理与架构详解
【8月更文第30天】在分布式文件系统中,ChunkServer 是一个重要的组件,负责存储文件系统中的数据块(chunks)。ChunkServer 的设计和实现对于确保数据的高可用性、一致性和持久性至关重要。本文将深入探讨 ChunkServer 的核心原理和内部架构设计,并通过代码示例来说明其实现细节。
69 1