Structured Streaming架构原理详解!

本文涉及的产品
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 笔记

一、Structured Streaming概述


Structured Streaming是一个基于sparksql引擎开发的可伸展和容错的流处理引擎。Structured Streaming传输中的关键思想是将实时数据流视为被连续添加的表。

这导致了一个新的流处理模型,该模型与批处理模型非常相似。您将像在静态表上一样将流计算表示为类似于批处理的标准查询,Spark在无界输入表上将其作为增量查询运行。


程式设计模型

将输入数据流视为“输入表”。流上到达的每个数据项都像是将新行附加到输入表中。1.png

对输入的查询将生成“结果表”。在每个触发间隔(例如,每1秒钟),新行将附加到输入表中,并最终更新结果表。无论何时更新结果表,我们都希望将更改后的结果行写入外部接收器。

2.png

“输出”定义为写到外部存储器的内容。可以在不同的模式下定义输出:


Complete Mode:整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。

Append Mode:仅将自上次触发以来追加在结果表中的新行写入外部存储器。这仅适用于预期结果表中现有行不会更改的查询。

Update Mode:仅自上次触发以来在结果表中已更新的行将被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,则等同于追加模式。3.png


二、Structured Streaming与Socket集成


以complete输出为例

[caizhengjie@node1 kafka]$ nc -lk 9999
java java java 
python java 
java java python java hive hove
hbase
hbase hbase hive python java
package com.spark.test
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/29
 * @time : 3:36 下午
 */
object StructuredStreamingTest {
    def main(args: Array[String]): Unit = {
        // 创建Spark Session对象
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        // 创建一个流数据框架
        val lines = spark.readStream
                .format("socket")
                .option("host","node1")
                .option("port",9999)
                .load() // 返回的是dataframe格式
        import spark.implicits._
        // 先将dataframe准换成dataset,在对数据进行处理
        val words = lines.as[String].flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
        // 输出
        val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

运行结果

4.png


三、Structured Streaming与Kafka集成

Kafka 0.10的结构化流集成,可从Kafka读取数据或向Kafka写入数据。


(1)通过IDEA工具

对于使用Maven项目定义的Scala 应用程序,需要加载pom.xml配置文件

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
      <version>${saprk.version}</version>
</dependency>

启动kafka的生产者

bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test
>java java python python
>hive hive java java 

运行程序

package com.spark.test
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/29
 * @time : 4:49 下午
 */
object StructuredStreamingKafka {
    def main(args: Array[String]): Unit = {
        // 创建Spark Session对象
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        // 读取kafka stream流
        val df = spark
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", "node1:9092")
                .option("subscribe", "test")
                .load() //返回的是dataframe格式
        import spark.implicits._
        // 先将dataframe准换成dataset,在对数据进行处理
        val lines = df.selectExpr("CAST(value AS STRING)").as[String]
        val words =  lines.flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
        // 输出
        val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

运行结果:

5.png

(2)通过spark-shell

在通过spark-shell运行时,需要将下面的jar包拷贝到spark的jar目录下

6.png

kafka_2.11-2.1.1.jar和kafka-clients-2.1.1.jar在kafka的lib中能找到

spark-sql-kafka-0-10_2.11-2.4.6.jar和spark-streaming-kafka-0-10_2.11-2.4.6.jar需要到maven的仓库下找

7.png

image.png

启动kafka的生产者

bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test

运行spark-shell

bin/spark-shell --master local[2]
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "test")
.load()
import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val words =  lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
// 输出
val query = wordCounts.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
// Exiting paste mode, now interpreting.

查看运行结果:

9.png


四、Structured Streaming与MySQL集成


关于Structured Streaming与MySQL集成可以见文档:

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html


通常,我们希望能够将流的输出写入外部数据库(例如MySQL)。在撰写本文时,结构化流API不支持将外部数据库作为接收器。但是,这样做的话,API选项将像一样简单.format(“jdbc”).start(“jdbc:mysql/…”)。同时,我们可以使用foreach接收器来完成此任务。让我们创建一个自定义JDBC Sink,它扩展了ForeachWriter并实现了其方法。10.png

我们现在就可以使用我们的JDBCSink了:

11.png

As batches are complete, counts by zip could be INSERTed/UPSERTed into MySQL as needed.

12.png


相关文章
|
1天前
|
监控 Kubernetes 持续交付
后端开发中的微服务架构:原理、优势与实践
本文深入探讨了在现代后端开发中,微服务架构如何成为提升系统可维护性、扩展性和敏捷性的关键技术。文章首先定义了微服务并解释了其核心原理,随后通过数据和案例分析,展示了微服务架构如何优化开发流程和提高系统性能。最后,文中提供了实施微服务架构的实用建议,旨在帮助开发者更好地理解和应用这一架构模式。
|
5天前
|
存储 SQL 分布式计算
技术心得记录:深入学习HBase架构原理
技术心得记录:深入学习HBase架构原理
|
14天前
|
存储 关系型数据库 MySQL
MySQL数据库进阶第六篇(InnoDB引擎架构,事务原理,MVCC)
MySQL数据库进阶第六篇(InnoDB引擎架构,事务原理,MVCC)
|
15天前
|
存储 传感器 编解码
【Camera基础(二)】摄像头驱动原理和开发&&V4L2子系统驱动架构
【Camera基础(二)】摄像头驱动原理和开发&&V4L2子系统驱动架构
|
15天前
|
编解码 Linux API
【Camera基础(一)】Camera摄像头工作原理及整机架构
【Camera基础(一)】Camera摄像头工作原理及整机架构
|
2月前
|
运维 监控 安全
WLAN的组网架构和工作原理
WLAN的组网架构和工作原理
47 0
|
2月前
|
存储 移动开发 前端开发
【Uniapp 专栏】Uniapp 架构设计与原理探究
【5月更文挑战第12天】Uniapp是一款用于跨平台移动应用开发的框架,以其高效性和灵活性脱颖而出。它基于HTML、CSS和Vue.js构建视图层,JavaScript处理逻辑层,管理数据层,实现统一编码并支持原生插件扩展。通过抽象平台特性,开发者能专注于业务逻辑,提高开发效率。尽管存在兼容性和复杂性挑战,但深入理解其架构设计与原理将助力开发者创建高质量的跨平台应用。随着技术进步,Uniapp将继续在移动开发领域扮演重要角色。
【Uniapp 专栏】Uniapp 架构设计与原理探究
|
2月前
|
负载均衡 NoSQL 关系型数据库
深入浅出Redis(六):Redis的主从架构与主从复制原理
深入浅出Redis(六):Redis的主从架构与主从复制原理
|
2月前
|
负载均衡 Java 开发者
Spring Cloud:一文读懂其原理与架构
Spring Cloud 是一套微服务解决方案,它整合了Netflix公司的多个开源框架,简化了分布式系统开发。Spring Cloud 提供了服务注册与发现、配置中心、消息总线、负载均衡、熔断机制等工具,让开发者可以快速地构建一些常见的微服务架构。
|
2月前
|
机器学习/深度学习 语音技术 网络架构
【视频】LSTM神经网络架构和原理及其在Python中的预测应用|数据分享
【视频】LSTM神经网络架构和原理及其在Python中的预测应用|数据分享