5万字Spark全集之末尾Structured Streaming续集!!!!!(二)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 5万字Spark全集之末尾Structured Streaming续集!!!!!

6、output mode

640.png

每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

这里有三种输出模型:

1.Append mode:输出新增的行,默认模式。每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询select,where,map,flatMap,filter,join等会支持追加模式。不支持聚合

2.Complete mode: 所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。

3.Update mode: 输出更新的行,每次更新结果集时,仅将被更新的结果行输出到接收器(自Spark 2.1.1起可用),不支持排序。

7、output sink

640.png

使用说明

File sink 输出到路径支持parquet文件,以及append模式

writeStream
  .format("parquet")       // can be "orc", "json", "csv", etc.
  .option("path", "path/to/destination/dir")
  .start()

Kafka sink 输出到kafka内的一到多个topic

writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "updates")
  .start()

Foreach sink 对输出中的记录运行任意计算。

writeStream
  .foreach(...)
  .start()

Console sink (for debugging) 当有触发器时,将输出打印到控制台。

writeStream
  .format("console")
  .start()

Memory sink (for debugging) - 输出作为内存表存储在内存中

writeStream
  .format("memory")
  .queryName("tableName")
  .start()

官网示例代码

// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")  
// Print new data to console
noAggDF.writeStream.format("console").start()
// Write new data to Parquet files
noAggDF.writeStream.format("parquet").option("checkpointLocation",
"path/to/checkpoint/dir").option("path", "path/to/destination/dir").start()
// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()
// Print updated aggregations to console
aggDF.writeStream.outputMode("complete").format("console").start()
// Have all the aggregates in an in-memory table
aggDF.writeStream.queryName("aggregates").outputMode("complete").format("memory").start()
spark.sql("select * from aggregates").show()  
// interactively query in-memory table

十一、StructuredStreaming与其他技术整合

1、整合Kafka

1.1 官网介绍

http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

640.png

Creating a Kafka Source for Streaming Queries

// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics(多个topic)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern(订阅通配符topic)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

Creating a Kafka Source for Batch Queries(kafka批处理查询)

// Subscribe to 1 topic
//defaults to the earliest and latest offsets(默认为最早和最新偏移)
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, (多个topic)
//specifying explicit Kafka offsets(指定明确的偏移量)
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, (订阅通配符topic)at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

注意:读取后的数据的Schema是固定的,包含的列如下:


d8c3d095da9b286debd1bbdd3ece4347.png


注意:下面的参数是不能被设置的,否则kafka会抛出异常:

group.id:kafka的source会在每次query的时候自定创建唯一的group id。

auto.offset.reset :为了避免每次手动设置startingoffsets的值,structured streaming在内部消费时会自动管理offset。这样就能保证订阅动态的topic时不会丢失数据。startingOffsets在流处理时,只会作用于第一次启动时,之后的处理都会自动的读取保存的offset。

key.deserializer,value.deserializer,key.serializer,value.serializer 序列化与反序列化,都ByteArraySerializer。

enable.auto.commit:Kafka源不支持提交任何偏移量。

640.png

1.2 整合环境准备

启动kafka

/export/servers/kafka/bin/kafka-server-start.sh -daemon
/export/servers/kafka/config/server.properties

向topic中生产数据

/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka

代码实现

package cn.itcast.structedstreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object KafkaStructuredStreamingDemo {
def main(args: Array[String]): Unit = {
  //1.创建SparkSession
  val spark: SparkSession =
SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
  val sc: SparkContext = spark.sparkContext
  sc.setLogLevel("WARN")
  import spark.implicits._
  //2.连接Kafka消费数据
  val dataDF: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "node01:9092")
    .option("subscribe", "spark_kafka")
    .load()
  //3.处理数据
  //注意:StructuredStreaming整合Kafka获取到的数据都是字节类型,所以需要按照官网要求,
//转成自己的实际类型
  val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String]
  val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
  val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
  result.writeStream
    .format("console")
    .outputMode("complete")
    .trigger(Trigger.ProcessingTime(0))
    .option("truncate",false)//超过长度的列不截断显示,即完全显示
    .start()
    .awaitTermination()
}
}

2、整合MySQL

2.1 简介

需求

我们开发中经常需要将流的运算结果输出到外部数据库,例如MySQL中,但是比较遗憾Structured Streaming API不支持外部数据库作为接收器

如果将来加入支持的话,它的API将会非常的简单比如:

format(“jdbc”).option(“url”,“jdbc:mysql://…”).start()

但是目前我们只能自己自定义一个JdbcSink,继承ForeachWriter并实现其方法

参考网站

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

640.png

2.2 代码演示
package cn.itcast.structedstreaming
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
object JDBCSinkDemo {
def main(args: Array[String]): Unit = {
  //1.创建SparkSession
  val spark: SparkSession =
SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
  val sc: SparkContext = spark.sparkContext
  sc.setLogLevel("WARN")
  import spark.implicits._
  //2.连接Kafka消费数据
  val dataDF: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "node01:9092")
    .option("subscribe", "spark_kafka")
    .load()
  //3.处理数据
  //注意:StructuredStreaming整合Kafka获取到的数据都是字节类型,所以需要按照官网要求,转成自己的实际类型
  val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String]
  val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
  val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
  val writer = new JDBCSink("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
  result.writeStream
    .foreach(writer)
    .outputMode("complete")
    .trigger(Trigger.ProcessingTime(0))
    .start()
    .awaitTermination()
}
class JDBCSink(url:String,username:String,password:String) extends ForeachWriter[Row] with Serializable{
  var connection:Connection = _ //_表示占位符,后面会给变量赋值
  var preparedStatement: PreparedStatement = _
  //开启连接
  override def open(partitionId: Long, version: Long): Boolean = {
    connection = DriverManager.getConnection(url, username, password)
    true
  }
  /*
  CREATE TABLE `t_word` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `word` varchar(255) NOT NULL,
      `count` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `word` (`word`)
    ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
    */
  //replace INTO `bigdata`.`t_word` (`id`, `word`, `count`) VALUES (NULL, NULL, NULL);
  //处理数据--存到MySQL
  override def process(row: Row): Unit = {
    val word: String = row.get(0).toString
    val count: String = row.get(1).toString
    println(word+":"+count)
    //REPLACE INTO:表示如果表中没有数据这插入,如果有数据则替换
    //注意:REPLACE INTO要求表有主键或唯一索引
    val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);"
    preparedStatement = connection.prepareStatement(sql)
    preparedStatement.setString(1,word)
    preparedStatement.setInt(2,Integer.parseInt(count))
    preparedStatement.executeUpdate()
  }
  //关闭资源
  override def close(errorOrNull: Throwable): Unit = {
    if (connection != null){
      connection.close()
    }
    if(preparedStatement != null){
      preparedStatement.close()
    }
  }
}
}
相关文章
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
53 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
109 0
|
2月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
66 0
|
3月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
58 0
|
3月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
58 0
|
3月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
42 0
|
分布式计算 API 流计算
Spark 2.0 Structured Streaming 分析
Spark 2.0 将流式计算也统一到DataFrame里去了,提出了Structured Streaming的概念,将数据源映射为一张无线长度的表,同时将流式计算的结果映射为另外一张表,完全以结构化的方式去操作流式数据,复用了其对象的Catalyst引擎。
3627 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
156 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
78 0