【Flink】(六)ProcessFunction API(底层 API)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink】(六)ProcessFunction API(底层 API)

文章目录


一、前言

二、KeyedProcessFunction

三、TimerService 和 定时器(Timers)

四、侧输出流(SideOutput)

五、CoProcessFunction


一、前言


我们之前学习的转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如 MapFunction 这样的 map 转换算子就无法访问时间戳或者当前事件的事件时间。


基于此,DataStream API 提供了一系列的 Low-Level 转换算子。可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window 函数和转换算子无法实现)。例如,Flink SQL 就是使用 Process Function 实现的。


Flink 提供了 8 个 Process Function:


  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction


二、KeyedProcessFunction


重点介绍下 KeyedProcessFunction。


KeyedProcessFunction 用来操作 KeyedStream。KeyedProcessFunction 会处理流的每一个元素,输出为 0 个、1 个或者多个元素。所有的 Process Function 都继承自RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:


  • processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context还可以将结果输出到别的流(side outputs)。


  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。


三、TimerService 和 定时器(Timers)


Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:


currentProcessingTime(): Long 返回当前处理时间

currentWatermark(): Long 返回当前 watermark 的时间戳

registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的processing time 的定时器。当 processing time 到达定时时间时,触发 timer。

registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。

deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。

当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在keyed streams 上面使用。


下面举个例子说明 KeyedProcessFunction 如何操作 KeyedStream。


需求:监控温度传感器的温度值,如果温度值在一秒钟之内(processing time)连续上升,则报警。

val warnings = readings
  .keyBy(_.id) 
  .process(new TempIncreaseAlertFunction)


看一下 TempIncreaseAlertFunction 如何实现, 程序中使用了 ValueState 这样一个状态变量。

class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] {
 // 保存上一个传感器温度值
 lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
  new ValueStateDescriptor[Double]("lastTemp", Types.of[Double])
 )
 // 保存注册的定时器的时间戳
 lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(
  new ValueStateDescriptor[Long]("timer", Types.of[Long])
 )
 override def processElement(r: SensorReading,
     ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
     out: Collector[String]): Unit = {
 // 取出上一次的温度
 val prevTemp = lastTemp.value()
 // 将当前温度更新到上一次的温度这个变量中
 lastTemp.update(r.temperature)
 val curTimerTimestamp = currentTimer.value()
 if (prevTemp == 0.0 || r.temperature < prevTemp) {
  // 温度下降或者是第一个温度值,删除定时器
  ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
  // 清空状态变量
  currentTimer.clear()
 } else if (r.temperature > prevTemp && curTimerTimestamp == 0) {
 // 温度上升且我们并没有设置定时器
   val timerTs = ctx.timerService().currentProcessingTime() + 1000
   ctx.timerService().registerProcessingTimeTimer(timerTs)
   currentTimer.update(timerTs)
  }
 }
 override def onTimer(ts: Long,
       ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
       out: Collector[String]): Unit = {
 out.collect("传感器 id 为: " + ctx.getCurrentKey + "的传感器温度值已经连续 1s 上升了。")
 currentTimer.clear()
  }
  }


四、侧输出流(SideOutput)


大部分的 DataStream API 的算子的输出是单一输出,也就是某种数据类型的流。除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。


下面是一个示例程序:

val monitoredReadings: DataStream[SensorReading] = readings
  .process(new FreezingMonitor)
monitoredReadings
  .getSideOutput(new OutputTag[String]("freezing-alarms"))
  .print()
readings.print()


接下来我们实现 FreezingMonitor 函数,用来监控传感器温度值,将温度值低于32F 的温度输出到 side output。

class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {
 // 定义一个侧输出标签
 lazy val freezingAlarmOutput: OutputTag[String] =
  new OutputTag[String]("freezing-alarms")
 override def processElement(r: SensorReading,
          ctx: ProcessFunction[SensorReading, SensorReading]#Context,
        out: Collector[SensorReading]): Unit = {
 // 温度在 32F 以下时,输出警告信息
 if (r.temperature < 32.0) {
  ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")
 }
 // 所有数据直接常规输出到主流
 out.collect(r)
  }
 }


五、CoProcessFunction


对于两条输入流,DataStream API 提供了 CoProcessFunction 这样的 low-level操作。CoProcessFunction 提供了操作每一个输入流的方法: processElement1()和processElement2()。


类似于 ProcessFunction,这两种方法都通过 Context 对象来调用。这个 Context对象可以访问事件数据,定时器时间戳,TimerService,以及 side outputs。CoProcessFunction 也提供了 onTimer()回调函数。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
2月前
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
191 0
|
2月前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
2月前
|
SQL 消息中间件 Java
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
SQL 消息中间件 API
Flink关系型API的公共部分
关系型程序的公共部分 下面的代码段展示了Table&SQL API所编写流式程序的程序模式: val env = StreamExecutionEnvironment.getExecutionEnvironment //创建TableEnvironment对象 val tableEnv = TableEnvironment.
2741 0
|
20天前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
577 7
阿里云实时计算Flink在多行业的应用和实践
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之从mysql读数据写到hive报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
存储 JSON Kubernetes
实时计算 Flink版操作报错合集之 写入hudi时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
SQL 流计算
实时计算 Flink版操作报错合集之怎么向一个未定义列的表中写入数据
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。