Structured Streaming架构原理详解!

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 笔记

一、Structured Streaming概述


Structured Streaming是一个基于sparksql引擎开发的可伸展和容错的流处理引擎。Structured Streaming传输中的关键思想是将实时数据流视为被连续添加的表。

这导致了一个新的流处理模型,该模型与批处理模型非常相似。您将像在静态表上一样将流计算表示为类似于批处理的标准查询,Spark在无界输入表上将其作为增量查询运行。


程式设计模型

将输入数据流视为“输入表”。流上到达的每个数据项都像是将新行附加到输入表中。1.png

对输入的查询将生成“结果表”。在每个触发间隔(例如,每1秒钟),新行将附加到输入表中,并最终更新结果表。无论何时更新结果表,我们都希望将更改后的结果行写入外部接收器。

2.png

“输出”定义为写到外部存储器的内容。可以在不同的模式下定义输出:


Complete Mode:整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。

Append Mode:仅将自上次触发以来追加在结果表中的新行写入外部存储器。这仅适用于预期结果表中现有行不会更改的查询。

Update Mode:仅自上次触发以来在结果表中已更新的行将被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,则等同于追加模式。3.png


二、Structured Streaming与Socket集成


以complete输出为例

[caizhengjie@node1 kafka]$ nc -lk 9999
java java java 
python java 
java java python java hive hove
hbase
hbase hbase hive python java
package com.spark.test
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/29
 * @time : 3:36 下午
 */
object StructuredStreamingTest {
    def main(args: Array[String]): Unit = {
        // 创建Spark Session对象
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        // 创建一个流数据框架
        val lines = spark.readStream
                .format("socket")
                .option("host","node1")
                .option("port",9999)
                .load() // 返回的是dataframe格式
        import spark.implicits._
        // 先将dataframe准换成dataset,在对数据进行处理
        val words = lines.as[String].flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
        // 输出
        val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

运行结果

4.png


三、Structured Streaming与Kafka集成

Kafka 0.10的结构化流集成,可从Kafka读取数据或向Kafka写入数据。


(1)通过IDEA工具

对于使用Maven项目定义的Scala 应用程序,需要加载pom.xml配置文件

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
      <version>${saprk.version}</version>
</dependency>

启动kafka的生产者

bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test
>java java python python
>hive hive java java 

运行程序

package com.spark.test
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/29
 * @time : 4:49 下午
 */
object StructuredStreamingKafka {
    def main(args: Array[String]): Unit = {
        // 创建Spark Session对象
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        // 读取kafka stream流
        val df = spark
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", "node1:9092")
                .option("subscribe", "test")
                .load() //返回的是dataframe格式
        import spark.implicits._
        // 先将dataframe准换成dataset,在对数据进行处理
        val lines = df.selectExpr("CAST(value AS STRING)").as[String]
        val words =  lines.flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
        // 输出
        val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

运行结果:

5.png

(2)通过spark-shell

在通过spark-shell运行时,需要将下面的jar包拷贝到spark的jar目录下

6.png

kafka_2.11-2.1.1.jar和kafka-clients-2.1.1.jar在kafka的lib中能找到

spark-sql-kafka-0-10_2.11-2.4.6.jar和spark-streaming-kafka-0-10_2.11-2.4.6.jar需要到maven的仓库下找

7.png

image.png

启动kafka的生产者

bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test

运行spark-shell

bin/spark-shell --master local[2]
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "test")
.load()
import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val words =  lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
// 输出
val query = wordCounts.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
// Exiting paste mode, now interpreting.

查看运行结果:

9.png


四、Structured Streaming与MySQL集成


关于Structured Streaming与MySQL集成可以见文档:

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html


通常,我们希望能够将流的输出写入外部数据库(例如MySQL)。在撰写本文时,结构化流API不支持将外部数据库作为接收器。但是,这样做的话,API选项将像一样简单.format(“jdbc”).start(“jdbc:mysql/…”)。同时,我们可以使用foreach接收器来完成此任务。让我们创建一个自定义JDBC Sink,它扩展了ForeachWriter并实现了其方法。10.png

我们现在就可以使用我们的JDBCSink了:

11.png

As batches are complete, counts by zip could be INSERTed/UPSERTed into MySQL as needed.

12.png


相关文章
|
3月前
|
存储 监控 算法
园区导航系统技术架构实现与原理解构
本文聚焦园区导航场景中室内外定位精度不足、车辆调度路径规划低效、数据孤岛难以支撑决策等技术痛点,从架构设计到技术原理,对该系统从定位到数据中台进行技术拆解。
110 0
园区导航系统技术架构实现与原理解构
|
4月前
|
存储 消息中间件 canal
zk基础—2.架构原理和使用场景
ZooKeeper(ZK)是一个分布式协调服务,广泛应用于分布式系统中。它提供了分布式锁、元数据管理、Master选举及分布式协调等功能,适用于如Kafka、HDFS、Canal等开源分布式系统。ZK集群采用主从架构,具有顺序一致性、高性能、高可用和高并发等特点。其核心机制包括ZAB协议(保证数据一致性)、Watcher监听回调机制(实现通知功能)、以及基于临时顺序节点的分布式锁实现。ZK适合小规模集群部署,主要用于读多写少的场景。
|
5月前
|
存储 人工智能 自然语言处理
为什么混合专家模型(MoE)如此高效:从架构原理到技术实现全解析
本文深入探讨了混合专家(MoE)架构在大型语言模型中的应用与技术原理。MoE通过稀疏激活机制,在保持模型高效性的同时实现参数规模的大幅扩展,已成为LLM发展的关键趋势。文章分析了MoE的核心组件,包括专家网络与路由机制,并对比了密集与稀疏MoE的特点。同时,详细介绍了Mixtral、Grok、DBRX和DeepSeek等代表性模型的技术特点及创新。MoE不仅解决了传统模型扩展成本高昂的问题,还展现出专业化与适应性强的优势,未来有望推动AI工具更广泛的应用。
1818 4
为什么混合专家模型(MoE)如此高效:从架构原理到技术实现全解析
|
10月前
|
存储 SQL 关系型数据库
MySQL进阶突击系列(03) MySQL架构原理solo九魂17环连问 | 给大厂面试官的一封信
本文介绍了MySQL架构原理、存储引擎和索引的相关知识点,涵盖查询和更新SQL的执行过程、MySQL各组件的作用、存储引擎的类型及特性、索引的建立和使用原则,以及二叉树、平衡二叉树和B树的区别。通过这些内容,帮助读者深入了解MySQL的工作机制,提高数据库管理和优化能力。
|
6月前
|
消息中间件 存储 设计模式
RocketMQ原理—5.高可用+高并发+高性能架构
本文主要从高可用架构、高并发架构、高性能架构三个方面来介绍RocketMQ的原理。
1557 21
RocketMQ原理—5.高可用+高并发+高性能架构
|
5月前
|
机器学习/深度学习 算法 测试技术
图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析
本文探讨了基于图的重排序方法在信息检索领域的应用与前景。传统两阶段检索架构中,初始检索速度快但结果可能含噪声,重排序阶段通过强大语言模型提升精度,但仍面临复杂需求挑战
139 0
图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析
|
10月前
|
人工智能 前端开发 编译器
【AI系统】LLVM 架构设计和原理
本文介绍了LLVM的诞生背景及其与GCC的区别,重点阐述了LLVM的架构特点,包括其组件独立性、中间表示(IR)的优势及整体架构。通过Clang+LLVM的实际编译案例,展示了从C代码到可执行文件的全过程,突显了LLVM在编译器领域的创新与优势。
470 3
|
5月前
|
Java 开发者 Spring
Spring框架 - 深度揭秘Spring框架的基础架构与工作原理
所以,当你进入这个Spring的世界,看似一片混乱,但细看之下,你会发现这里有个牢固的结构支撑,一切皆有可能。不论你要建设的是一座宏大的城堡,还是个小巧的花园,只要你的工具箱里有Spring,你就能轻松搞定。
210 9
|
6月前
|
人工智能 自然语言处理 安全
基于LlamaIndex实现CodeAct Agent:代码执行工作流的技术架构与原理
CodeAct是一种先进的AI辅助系统范式,深度融合自然语言处理与代码执行能力。通过自定义代码执行代理,开发者可精准控制代码生成、执行及管理流程。本文基于LlamaIndex框架构建CodeAct Agent,解析其技术架构,包括代码执行环境、工作流定义系统、提示工程机制和状态管理系统。同时探讨安全性考量及应用场景,如软件开发、数据科学和教育领域。未来发展方向涵盖更精细的代码生成、多语言支持及更强的安全隔离机制,推动AI辅助编程边界拓展。
280 3
基于LlamaIndex实现CodeAct Agent:代码执行工作流的技术架构与原理

热门文章

最新文章