6、output mode
每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
这里有三种输出模型:
1.Append mode:输出新增的行,默认模式。每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询select,where,map,flatMap,filter,join等会支持追加模式。不支持聚合
2.Complete mode: 所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。
3.Update mode: 输出更新的行,每次更新结果集时,仅将被更新的结果行输出到接收器(自Spark 2.1.1起可用),不支持排序。
7、output sink
使用说明
File sink 输出到路径支持parquet文件,以及append模式
writeStream .format("parquet") // can be "orc", "json", "csv", etc. .option("path", "path/to/destination/dir") .start()
Kafka sink 输出到kafka内的一到多个topic
writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .start()
Foreach sink 对输出中的记录运行任意计算。
writeStream .foreach(...) .start()
Console sink (for debugging) 当有触发器时,将输出打印到控制台。
writeStream .format("console") .start()
Memory sink (for debugging) - 输出作为内存表存储在内存中
writeStream .format("memory") .queryName("tableName") .start()
官网示例代码
// ========== DF with no aggregations ========== val noAggDF = deviceDataDf.select("device").where("signal > 10") // Print new data to console noAggDF.writeStream.format("console").start() // Write new data to Parquet files noAggDF.writeStream.format("parquet").option("checkpointLocation", "path/to/checkpoint/dir").option("path", "path/to/destination/dir").start() // ========== DF with aggregation ========== val aggDF = df.groupBy("device").count() // Print updated aggregations to console aggDF.writeStream.outputMode("complete").format("console").start() // Have all the aggregates in an in-memory table aggDF.writeStream.queryName("aggregates").outputMode("complete").format("memory").start() spark.sql("select * from aggregates").show() // interactively query in-memory table
十一、StructuredStreaming与其他技术整合
1、整合Kafka
1.1 官网介绍
http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Creating a Kafka Source for Streaming Queries
// Subscribe to 1 topic val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] // Subscribe to multiple topics(多个topic) val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] // Subscribe to a pattern(订阅通配符topic) val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
Creating a Kafka Source for Batch Queries(kafka批处理查询)
// Subscribe to 1 topic //defaults to the earliest and latest offsets(默认为最早和最新偏移) val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] // Subscribe to multiple topics, (多个topic) //specifying explicit Kafka offsets(指定明确的偏移量) val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] // Subscribe to a pattern, (订阅通配符topic)at the earliest and latest offsets val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
注意:读取后的数据的Schema是固定的,包含的列如下:
注意:下面的参数是不能被设置的,否则kafka会抛出异常:
group.id:kafka的source会在每次query的时候自定创建唯一的group id。
auto.offset.reset :为了避免每次手动设置startingoffsets的值,structured streaming在内部消费时会自动管理offset。这样就能保证订阅动态的topic时不会丢失数据。startingOffsets在流处理时,只会作用于第一次启动时,之后的处理都会自动的读取保存的offset。
key.deserializer,value.deserializer,key.serializer,value.serializer 序列化与反序列化,都ByteArraySerializer。
enable.auto.commit:Kafka源不支持提交任何偏移量。
1.2 整合环境准备
启动kafka
/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
向topic中生产数据
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
代码实现
package cn.itcast.structedstreaming import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object KafkaStructuredStreamingDemo { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") import spark.implicits._ //2.连接Kafka消费数据 val dataDF: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node01:9092") .option("subscribe", "spark_kafka") .load() //3.处理数据 //注意:StructuredStreaming整合Kafka获取到的数据都是字节类型,所以需要按照官网要求, //转成自己的实际类型 val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String] val wordDS: Dataset[String] = dataDS.flatMap(_.split(" ")) val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc) result.writeStream .format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime(0)) .option("truncate",false)//超过长度的列不截断显示,即完全显示 .start() .awaitTermination() } }
2、整合MySQL
2.1 简介
需求
我们开发中经常需要将流的运算结果输出到外部数据库,例如MySQL中,但是比较遗憾Structured Streaming API不支持外部数据库作为接收器
如果将来加入支持的话,它的API将会非常的简单比如:
format(“jdbc”).option(“url”,“jdbc:mysql://…”).start()
但是目前我们只能自己自定义一个JdbcSink,继承ForeachWriter并实现其方法
参考网站
2.2 代码演示
package cn.itcast.structedstreaming import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.streaming.Trigger object JDBCSinkDemo { def main(args: Array[String]): Unit = { //1.创建SparkSession val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate() val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN") import spark.implicits._ //2.连接Kafka消费数据 val dataDF: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node01:9092") .option("subscribe", "spark_kafka") .load() //3.处理数据 //注意:StructuredStreaming整合Kafka获取到的数据都是字节类型,所以需要按照官网要求,转成自己的实际类型 val dataDS: Dataset[String] = dataDF.selectExpr("CAST(value AS STRING)").as[String] val wordDS: Dataset[String] = dataDS.flatMap(_.split(" ")) val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc) val writer = new JDBCSink("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root") result.writeStream .foreach(writer) .outputMode("complete") .trigger(Trigger.ProcessingTime(0)) .start() .awaitTermination() } class JDBCSink(url:String,username:String,password:String) extends ForeachWriter[Row] with Serializable{ var connection:Connection = _ //_表示占位符,后面会给变量赋值 var preparedStatement: PreparedStatement = _ //开启连接 override def open(partitionId: Long, version: Long): Boolean = { connection = DriverManager.getConnection(url, username, password) true } /* CREATE TABLE `t_word` ( `id` int(11) NOT NULL AUTO_INCREMENT, `word` varchar(255) NOT NULL, `count` int(11) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `word` (`word`) ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8; */ //replace INTO `bigdata`.`t_word` (`id`, `word`, `count`) VALUES (NULL, NULL, NULL); //处理数据--存到MySQL override def process(row: Row): Unit = { val word: String = row.get(0).toString val count: String = row.get(1).toString println(word+":"+count) //REPLACE INTO:表示如果表中没有数据这插入,如果有数据则替换 //注意:REPLACE INTO要求表有主键或唯一索引 val sql = "REPLACE INTO `t_word` (`id`, `word`, `count`) VALUES (NULL, ?, ?);" preparedStatement = connection.prepareStatement(sql) preparedStatement.setString(1,word) preparedStatement.setInt(2,Integer.parseInt(count)) preparedStatement.executeUpdate() } //关闭资源 override def close(errorOrNull: Throwable): Unit = { if (connection != null){ connection.close() } if(preparedStatement != null){ preparedStatement.close() } } } }