Structured_Sink_Trigger | 学习笔记

简介: 快速学习 Structured_Sink_Trigger

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_Trigger】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12158


Structured_Sink_Trigger

内容介绍

一.Tigger

二.   步骤

三.连续流处理

 

一.Tigger

指定批次,Structured Sink 批次设置较重要.

1.Tigger 目标

掌握如何控制 StructuredStreaming 的处理时间

 

二.  步骤

微批次处理

连续流处理

1.微批次处理

image.png

数据在时间线不断产生,流式数据。

每个结果是中间批次产生的结果,速度较快。

并不是真正的流,而是缓存一个批次网期的数据,后处理这一批次的数据

2.通用流程

步骤

1.根据 Spark 提供的调试用的数据源 Rate 创建流式 DataFrame

·Rate 数据源会定期提供一个由两列 timestamp,value 组成的数据,value 是一个随机数

2.处理利和聚合数据,计算每个个位数和十位数各有多少条数烟

·对 value 求 log10 即可得出其位数

·后按照位数进行分组,最终就可以看到每个位数的数据行多少个

代码

val spark =SparkSession.builder()

.master(="local[6]")

appName("socket_processor")

getorcreate()

import org.apache.spark.sql.functions._

import spark.implicits._

spark.sparkContext.setLogLevel("ERROR")

val source spark.readstream

format("rate")

.load()

val result source.select(1og10('value)cast IntegerType as 'key.'value)

agg(count('key)as count)

select('key,'count)

where('key.isNotNull)

sort('key.asc)

3. 代码

import org.apache.spark.sq1.SparkSession

import org.apache.spark.sql.streaming.OutputMode

object Triggers

def main(args:Array[String]):Unit =

//创建数据源

val spark SparkSession.builder()

appName(name="triggers")

master(master ="local[6]")getorcreate()

// timestamp,value

val source spark.readstream

.format(source="rate")

//一般用于测试演示,模拟一个假的数据源

.load()

//简单处理

val result source

//落地

source.writestream

format(source "console")

outputMode(OutputMode.Complete())

start()

.awaitTermination()

若 Complete 中间必须有操作,其展示的是全集操作。

若不存在聚合操作,则不可用。

image.png

限制输出  spark sparkContext.setloglevel(“wang”)

image.png

下一个批次立即执行,

用 trigger(trigger.Processingtime())可以指定时间间隔,

按照固定的时间间隔划分批次

如果指定间隔为零,相当于默认批次划分

如果前一批次的时间提前完成,等待此间隔到达之后,才会进入下一个批次

如果前一个批次延迟完成,么下一个皮子会在前一个批次完成后立即执行

image.png

image.png

trigger(trigger.once()):只执行一次,运行后不再处理。

遗留信息处理完整会用到。

image.png

微批次无需考虑数据一致性,数据更容易处理,但时间颇慢三

 

二.连续流处理

1.介绍

微批次会将收到的数据按照批次划分为不同的 DataFrame, 后执行 DataFrame, 所以其数据的处理适迟取决于每个

DataFrame 的处理速度,最快也只能在一~个 DataFrame 结束后立刻执行下一个,位快可以达到 100ms 左右的端到端

延迟

而连续流处理可以做到大约 1ms 的端到端数据处理延迟

连续流处理可以达到 at-least-once 的容错语义

从 Spark2.3 版本开始支持连续流处理,我们所采用的 2.2 版木还没有这个特性并且这个静性度止到 2.4 发然是

实验性质,不建议在生产环境中使用

·操作

的流程

2.步骤

使用特殊的 Trig8er 完成功能

3.代码

result.writestream

outputMode(OutputMode.Complete())

format("console")

trigger(Trigger.Continuous("1 second"))

start()

awaitTermination()

4.限制(限制较多)

只支持 Map 类的有类型探作

只支持普通的的 Q@ 类操作,不支持聚给

Source 只支持 Kafka

Sink 只支特 Kafka,Console,Memory

相关文章
|
存储 SQL 消息中间件
实战|使用Spark Structured Streaming写入Hudi
传统数仓的组织架构是针对离线数据的OLAP(联机事务分析)需求设计的,常用的导入数据方式为采用sqoop或spark定时作业逐批将业务库数据导入数仓。随着数据分析对实时性要求的不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。
891 0
实战|使用Spark Structured Streaming写入Hudi
|
消息中间件 Oracle 关系型数据库
Flink CDC(Change Data Capture)
Flink CDC(Change Data Capture)是一个 Flink 应用程序,用于从关系型数据库(如 MySQL、PostgreSQL 等)中捕获数据更改,将其转换为流数据,并将其发送到 Flink
318 1
|
消息中间件 Oracle 关系型数据库
Flink CDC (Change Data Capture)
Flink CDC (Change Data Capture) 是一种基于 Flink 的流式数据处理技术,用于捕获数据源的变化,并将变化发送到下游系统。Flink CDC 可以将数据源的变化转换为流式数据,并实时地将数据流发送到下游系统,以便下游系统及时处理这些变化。
572 1
|
消息中间件 SQL 分布式计算
Structured_Sink_Foreach | 学习笔记
快速学习 Structured_Sink_Foreach
Structured_Sink_Foreach | 学习笔记
|
消息中间件 分布式计算 大数据
Structured_Source_Kafka_整合 | 学习笔记
快速学习 Structured_Source_Kafka_整合
Structured_Source_Kafka_整合 | 学习笔记
|
消息中间件 JSON 大数据
Structured_Source_Kafka_回顾 | 学习笔记
快速学习 Structured_Source_Kafka_回顾
Structured_Source_Kafka_回顾 | 学习笔记
|
消息中间件 分布式计算 Hadoop
Structured_Sink_Kafka | 学习笔记
快速学习 Structured_Sink_Kafka
Structured_Sink_Kafka | 学习笔记
|
消息中间件 分布式计算 大数据
Structured_Sink_HDFS | 学习笔记
快速学习 Structured_Sink_HDFS
Structured_Sink_HDFS | 学习笔记
下一篇
无影云桌面