开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_Foreach】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/692/detail/12157
Structured_Sink_Foreach
内容介绍
一.Foreach Writer
二.Foreach 模式
如何整合
官方提供的 think 只有 kafka 和 HDFS,如果将数据落地可以使用 Foreach,一个一个处理,非常安全
一.Foreach Writer
1.Foreach Writer 目标
掌握 Foreach 模式理解如何扩展 Structured Streaming 的 Sink,同时能够将数据落地到 MySQL
2. 步骤
需求
代码
3.需求
场景
大数据有一个常见的应用场景
(1)收集业务系统数据
(2)数据处理
(3)放入 OLTP 数据
//进行商业分析(增长率,人数等)
(4)外部通过 ECharts 获取并处理数据
//处理展示
这个场景下,StructuredStreaming 就需要处理数据水放入 MySQL 或者MongoDB,HBase 中以供 Web 程序可以
获取数据,图表的形式展示在前端
二.Foreach 模式
1.起因
·在 Structured Streaming 中,并未提供完整的 MySQL/JDBC 整合工具
·不止:MySQL 和 JDBC,可能会有其它的日标端需要写入
·很多时候 Structured Streaming 需要对按些第三方的系统,例阿里云的云存储,业马云的云存储第,但是 Spak 无法对所有第三方都提供支持,有时候需要自已编写。
解决方案
·既然无法河足所有的整合需求,StructuredStreaming 提供了 Foreach, 可以拿到每·个批次的数据
·通过 Foreach 拿到数据后,可以通过自定义写入方式,从而将数据落地到其它的系统
2.案例需求:
从 kafka 中读取数据处理后放入 MySQL
代码
1.创建 DataFrame 表示 Kafka 数据源
2.在源 DataFrame 中选择三列数据
3.创建 ForeachWriter 按收每一个批次的数据落地 MySQL
4.Foreach 落地数据
代码
Import org.apache.spark.sql.SparkSession
val spark SparkSession.builder()
master("local[6]")
appName("kafka integration")
getorCreate()
import spark.implicits._
val source spark
.readstream
format("kafka")
.option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")
option("subscribe","streaming-bank")
.option("startingoffsets","earliest")
.load()
.selectExpr( exprs = "CAST(value AS STRING) as value")
as[String]
//处理 csV, Dataset(String), Dataset(id, name, category)
val result = source. map(item => {
val arr = item.split( regex "::")
(arr(e). toInt, arr(1). toString, arr(2) tostring)
}).as[(Int, string, string)]. toDF( colNames "id", "name", "category")
class MySQLWriter extends ForeachWriter[Row]
private val driver ="com.mysql.jdbc.Driver"
private var connection:Connection
Private val url
="jdbc:mysql://node01:3306/streaming-movies-result"
private var statement:Statement =
Database
override def open(partitionId:Long,version:Long):Boolean =
class.forName(driver)
connection DriverManager.getconnection(url)
statement connection.createStatement()
true
override def process(value:Row):Unit =
statement.executeUpdate(sql =s"insert into movies values(${value.get(0)),$(value.get(1)),${value.get(2)})"
override def close(errororNull:Throwable):Unit =
connection.close()
result.writestream
foreach(new MySQLWriter)
.start()
awaitTermination()
第一步加载 driver
第二部创建 connection,url,创建 statement 执行 SQL 语句,将此类型放在 action 上。返回一个布尔值查看返回值是否正确。
写入前需要创建完成数据库和表