5万字Spark全集之末尾Structured Streaming续集!!!!!(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 5万字Spark全集之末尾Structured Streaming续集!!!!!

6、output mode

640.png

每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

这里有三种输出模型:

1.Append mode:输出新增的行,默认模式。每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询select,where,map,flatMap,filter,join等会支持追加模式。不支持聚合

2.Complete mode: 所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。

3.Update mode: 输出更新的行,每次更新结果集时,仅将被更新的结果行输出到接收器(自Spark 2.1.1起可用),不支持排序。

7、output sink

640.png

使用说明

File sink 输出到路径支持parquet文件,以及append模式

writeStream
  .format("parquet")       // can be "orc", "json", "csv", etc.
  .option("path", "path/to/destination/dir")
  .start()

Kafka sink 输出到kafka内的一到多个topic

writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "updates")
  .start()

Foreach sink 对输出中的记录运行任意计算。

writeStream
  .foreach(...)
  .start()

Console sink (for debugging) 当有触发器时,将输出打印到控制台。

writeStream
  .format("console")
  .start()

Memory sink (for debugging) - 输出作为内存表存储在内存中

writeStream
  .format("memory")
  .queryName("tableName")
  .start()

官网示例代码

// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")  
// Print new data to console
noAggDF.writeStream.format("console").start()
// Write new data to Parquet files
noAggDF.writeStream.format("parquet").option("checkpointLocation",
"path/to/checkpoint/dir").option("path", "path/to/destination/dir").start()
// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()
// Print updated aggregations to console
aggDF.writeStream.outputMode("complete").format("console").start()
// Have all the aggregates in an in-memory table
aggDF.writeStream.queryName("aggregates").outputMode("complete").format("memory").start()
spark.sql("select * from aggregates").show()  
// interactively query in-memory table

十一、StructuredStreaming与其他技术整合

1、整合Kafka

1.1 官网介绍

http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

640.png

Creating a Kafka Source for Streaming Queries

// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics(多个topic)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern(订阅通配符topic)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

Creating a Kafka Source for Batch Queries(kafka批处理查询)

// Subscribe to 1 topic
//defaults to the earliest and latest offsets(默认为最早和最新偏移)
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, (多个topic)
//specifying explicit Kafka offsets(指定明确的偏移量)
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, (订阅通配符topic)at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

注意:读取后的数据的Schema是固定的,包含的列如下:


d8c3d095da9b286debd1bbdd3ece4347.png


注意:下面的参数是不能被设置的,否则kafka会抛出异常:

group.id:kafka的source会在每次query的时候自定创建唯一的group id。

auto.offset.reset :为了避免每次手动设置startingoffsets的值,structured streaming在内部消费时会自动管理offset。这样就能保证订阅动态的topic时不会丢失数据。startingOffsets在流处理时,只会作用于第一次启动时,之后的处理都会自动的读取保存的offset。

key.deserializer,value.deserializer,key.serializer,value.serializer 序列化与反序列化,都ByteArraySerializer。

enable.auto.commit:Kafka源不支持提交任何偏移量。

640.png

1.2 整合环境准备

启动kafka

/export/servers/kafka/bin/kafka-server-start.sh -daemon
/export/servers/kafka/config/server.properties

向topic中生产数据

/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka

代码实现

package cn.itcast.structedstreaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object KafkaStructuredStreamingDemo {
def main(args: Array[String]): Unit = {
  //1.创建SparkSession
  val spark: SparkSession =
SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
  val sc: SparkContext = spark.sparkContext
  sc.setLogLevel("WARN")
  import spark.implicits._
  //2.连接Kafka消费数据
  val dataDF: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "node01:9092")
    .option("subscribe", "spark_kafka")
    .load()
  //3.处理数据
  //注意:StructuredStreaming整合Kafka获取到的数据都是字节类型,所以需要按照官网要求,
//转成自己的实际类型
  val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String]
  val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
  val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
  result.writeStream
    .format("console")
    .outputMode("complete")
    .trigger(Trigger.ProcessingTime(0))
    .option("truncate",false)//超过长度的列不截断显示,即完全显示
    .start()
    .awaitTermination()
}
}

2、整合MySQL

2.1 简介

需求

我们开发中经常需要将流的运算结果输出到外部数据库,例如MySQL中,但是比较遗憾Structured Streaming API不支持外部数据库作为接收器

如果将来加入支持的话,它的API将会非常的简单比如:

format(“jdbc”).option(“url”,“jdbc:mysql://…”).start()

但是目前我们只能自己自定义一个JdbcSink,继承ForeachWriter并实现其方法

参考网站

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html

640.png

2.2 代码演示
package cn.itcast.structedstreaming
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
object JDBCSinkDemo {
def main(args: Array[String]): Unit = {
  //1.创建SparkSession
  val spark: SparkSession =
SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
  val sc: SparkContext = spark.sparkContext
  sc.setLogLevel("WARN")
  import spark.implicits._
  //2.连接Kafka消费数据
  val dataDF: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "node01:9092")
    .option("subscribe", "spark_kafka")
    .load()
  //3.处理数据
  //注意:StructuredStreaming整合Kafka获取到的数据都是字节类型,所以需要按照官网要求,转成自己的实际类型
  val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String]
  val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))
  val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)
  val writer = new JDBCSink("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")
  result.writeStream
    .foreach(writer)
    .outputMode("complete")
    .trigger(Trigger.ProcessingTime(0))
    .start()
    .awaitTermination()
}
class JDBCSink(url:String,username:String,password:String) extends ForeachWriter[Row] with Serializable{
  var connection:Connection = _ //_表示占位符,后面会给变量赋值
  var preparedStatement: PreparedStatement = _
  //开启连接
  override def open(partitionId: Long, version: Long): Boolean = {
    connection = DriverManager.getConnection(url, username, password)
    true
  }
  /*
  CREATE TABLE `t_word` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `word` varchar(255) NOT NULL,
      `count` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `word` (`word`)
    ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
    */
  //replace INTO `bigdata`.`t_word` (`id`, `word`, `count`) VALUES (NULL, NULL, NULL);
  //处理数据--存到MySQL
  override def process(row: Row): Unit = {
    val word: String = row.get(0).toString
    val count: String = row.get(1).toString
    println(word+":"+count)
    //REPLACE INTO:表示如果表中没有数据这插入,如果有数据则替换
    //注意:REPLACE INTO要求表有主键或唯一索引
    val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);"
    preparedStatement = connection.prepareStatement(sql)
    preparedStatement.setString(1,word)
    preparedStatement.setInt(2,Integer.parseInt(count))
    preparedStatement.executeUpdate()
  }
  //关闭资源
  override def close(errorOrNull: Throwable): Unit = {
    if (connection != null){
      connection.close()
    }
    if(preparedStatement != null){
      preparedStatement.close()
    }
  }
}
}
相关文章
|
26天前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
34 0
|
26天前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
68 0
|
26天前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
51 0
|
26天前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
28 0
|
26天前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
39 0
|
26天前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
30 0
|
26天前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
27 0
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
72 0
|
消息中间件 分布式计算 Kafka
大数据Spark Structured Streaming集成 Kafka
大数据Spark Structured Streaming集成 Kafka
123 0
|
消息中间件 分布式计算 Kafka
大数据Spark Streaming集成Kafka
大数据Spark Streaming集成Kafka
120 0