开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_Trigger】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/692/detail/12158
Structured_Sink_Trigger
内容介绍
一.Tigger
二. 步骤
三.连续流处理
一.Tigger
指定批次,Structured Sink 批次设置较重要.
1.Tigger 目标
掌握如何控制 StructuredStreaming 的处理时间
二. 步骤
微批次处理
连续流处理
1.微批次处理
数据在时间线不断产生,流式数据。
每个结果是中间批次产生的结果,速度较快。
并不是真正的流,而是缓存一个批次网期的数据,后处理这一批次的数据
2.通用流程
步骤
1.根据 Spark 提供的调试用的数据源 Rate 创建流式 DataFrame
·Rate 数据源会定期提供一个由两列 timestamp,value 组成的数据,value 是一个随机数
2.处理利和聚合数据,计算每个个位数和十位数各有多少条数烟
·对 value 求 log10 即可得出其位数
·后按照位数进行分组,最终就可以看到每个位数的数据行多少个
代码
val spark =SparkSession.builder()
.master(="local[6]")
appName("socket_processor")
getorcreate()
import org.apache.spark.sql.functions._
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val source spark.readstream
format("rate")
.load()
val result source.select(1og10('value)cast IntegerType as 'key.'value)
agg(count('key)as count)
select('key,'count)
where('key.isNotNull)
sort('key.asc)
3. 代码
import org.apache.spark.sq1.SparkSession
import org.apache.spark.sql.streaming.OutputMode
object Triggers
def main(args:Array[String]):Unit =
//创建数据源
val spark SparkSession.builder()
appName(name="triggers")
master(master ="local[6]")getorcreate()
// timestamp,value
val source spark.readstream
.format(source="rate")
//一般用于测试演示,模拟一个假的数据源
.load()
//简单处理
val result source
//落地
source.writestream
format(source "console")
outputMode(OutputMode.Complete())
start()
.awaitTermination()
若 Complete 中间必须有操作,其展示的是全集操作。
若不存在聚合操作,则不可用。
限制输出 spark sparkContext.setloglevel(“wang”)
下一个批次立即执行,
用 trigger(trigger.Processingtime())可以指定时间间隔,
按照固定的时间间隔划分批次
如果指定间隔为零,相当于默认批次划分
如果前一批次的时间提前完成,等待此间隔到达之后,才会进入下一个批次
如果前一个批次延迟完成,么下一个皮子会在前一个批次完成后立即执行
trigger(trigger.once())
:只执行一次,运行后不再处理。
遗留信息处理完整会用到。
微批次无需考虑数据一致性,数据更容易处理,但时间颇慢三
二.连续流处理
1.介绍
微批次会将收到的数据按照批次划分为不同的 DataFrame, 后执行 DataFrame, 所以其数据的处理适迟取决于每个
DataFrame 的处理速度,最快也只能在一~个 DataFrame 结束后立刻执行下一个,位快可以达到 100ms 左右的端到端
延迟
而连续流处理可以做到大约 1ms 的端到端数据处理延迟
连续流处理可以达到 at-least-once 的容错语义
从 Spark2.3 版本开始支持连续流处理,我们所采用的 2.2 版木还没有这个特性并且这个静性度止到 2.4 发然是
实验性质,不建议在生产环境中使用
·操作
的流程
2.步骤
使用特殊的 Trig8er 完成功能
3.代码
result.writestream
outputMode(OutputMode.Complete())
format("console")
trigger(Trigger.Continuous("1 second"))
start()
awaitTermination()
4.限制(限制较多)
只支持 Map 类的有类型探作
只支持普通的的 Q@ 类操作,不支持聚给
Source 只支持 Kafka
Sink 只支特 Kafka,Console,Memory