Structured_Sink_Foreach | 学习笔记

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 快速学习 Structured_Sink_Foreach

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_Foreach】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12157


Structured_Sink_Foreach

内容介绍

一.Foreach Writer

二.Foreach 模式

 

如何整合

官方提供的 think 只有 kafka 和 HDFS,如果将数据落地可以使用 Foreach,一个一个处理,非常安全

 

一.Foreach Writer

1.Foreach Writer 目标

掌握 Foreach 模式理解如何扩展 Structured Streaming 的 Sink,同时能够将数据落地到 MySQL

2. 步骤

需求

代码

3.需求

场景

大数据有一个常见的应用场景

(1)收集业务系统数据

(2)数据处理

(3)放入 OLTP 数据

//进行商业分析(增长率,人数等)

(4)外部通过 ECharts 获取并处理数据

//处理展示

这个场景下,StructuredStreaming 就需要处理数据水放入 MySQL 或者MongoDB,HBase 中以供 Web 程序可以

获取数据,图表的形式展示在前端

image.png


二.Foreach 模式

1.起因

·在 Structured Streaming 中,并未提供完整的 MySQL/JDBC 整合工具

·不止:MySQL 和 JDBC,可能会有其它的日标端需要写入

·很多时候 Structured Streaming 需要对按些第三方的系统,例阿里云的云存储,业马云的云存储第,但是 Spak 无法对所有第三方都提供支持,有时候需要自已编写。

解决方案

image.png

·既然无法河足所有的整合需求,StructuredStreaming 提供了 Foreach, 可以拿到每·个批次的数据

·通过 Foreach 拿到数据后,可以通过自定义写入方式,从而将数据落地到其它的系统

2.案例需求:

从 kafka 中读取数据处理后放入 MySQL

image.png

代码

1.创建 DataFrame 表示 Kafka 数据源

2.在源 DataFrame 中选择三列数据

3.创建 ForeachWriter 按收每一个批次的数据落地 MySQL

4.Foreach 落地数据

代码

Import org.apache.spark.sql.SparkSession

val spark SparkSession.builder()

master("local[6]")

appName("kafka integration")

getorCreate()

import spark.implicits._

val source spark

.readstream

format("kafka").option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")

option("subscribe","streaming-bank")

.option("startingoffsets","earliest")

.load()

.selectExpr( exprs = "CAST(value AS STRING) as value")

as[String]

//处理 csV, Dataset(String), Dataset(id, name, category)

val result = source. map(item => {

val arr = item.split( regex "::")

(arr(e). toInt, arr(1). toString, arr(2) tostring)

}).as[(Int, string, string)]. toDF( colNames "id", "name", "category")

class MySQLWriter extends ForeachWriter[Row]

private val driver ="com.mysql.jdbc.Driver"

private var connection:Connection

Private  val  url

="jdbc:mysql://node01:3306/streaming-movies-result"

private var statement:Statement =

Database

override def open(partitionId:Long,version:Long):Boolean =

class.forName(driver)

connection DriverManager.getconnection(url)

statement connection.createStatement()

true

override def process(value:Row):Unit =

statement.executeUpdate(sql =s"insert into movies values(${value.get(0)),$(value.get(1)),${value.get(2)})"

override def close(errororNull:Throwable):Unit =

connection.close()

result.writestream

foreach(new MySQLWriter)

.start()

awaitTermination()

第一步加载 driver

第二部创建 connection,url,创建 statement 执行 SQL 语句,将此类型放在 action 上。返回一个布尔值查看返回值是否正确。

写入前需要创建完成数据库和表

 

相关文章
|
2月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
38 0
|
5月前
|
存储 SQL
Structured Query
【7月更文挑战第13天】
37 4
|
7月前
|
SQL 数据处理 HIVE
【Hive】写出Hive中split、coalesce及collect_list函数的用法?
【4月更文挑战第17天】【Hive】写出Hive中split、coalesce及collect_list函数的用法?
|
分布式计算 Scala 流计算
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
153 0
Structured Streaming报错记录:Overloaded method foreachBatch with alternatives
|
消息中间件 缓存 分布式计算
Structured_Sink_Trigger | 学习笔记
快速学习 Structured_Sink_Trigger
Structured_Sink_Trigger | 学习笔记
|
分布式计算 大数据 开发者
RDD 算子_ Action _ foreach | 学习笔记
快速学习 RDD 算子_ Action _ foreach
RDD 算子_ Action _ foreach | 学习笔记
|
消息中间件 JSON 大数据
Structured_Source_Kafka_回顾 | 学习笔记
快速学习 Structured_Source_Kafka_回顾
Structured_Source_Kafka_回顾 | 学习笔记
|
消息中间件 分布式计算 大数据
Structured_Source_Kafka_整合 | 学习笔记
快速学习 Structured_Source_Kafka_整合
Structured_Source_Kafka_整合 | 学习笔记
|
分布式计算 大数据 API
Rdd 算子_转换_mappartitions | 学习笔记
快速学习 Rdd 算子_转换_mappartitions
168 0
Rdd 算子_转换_mappartitions | 学习笔记
|
JSON 关系型数据库 MySQL
Structured_Source_HDFS_案例介绍 | 学习笔记
快速学习 Structured_Source_HDFS_案例介绍
Structured_Source_HDFS_案例介绍 | 学习笔记