Flink / Scala - DataStream Broadcast State 模式示例详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 一.引言上一篇文章Flink / Scala - DataSet 应用 Broadcast Variables介绍了 DataSet 场景下 Broadcast 的使用,这一边

 一.引言

上一篇文章 Flink / Scala - DataSet 应用 Broadcast Variables 介绍了 DataSet 场景下 Broadcast 的使用,本文将介绍 DataStream 中的 Broadcast 应用场景,与 DataSet 类似,Broadcast 的值是所有 task 公用的,Broadcast  State 是为 DataStreaming 所有 task 定制的可实时修改的公用值。

二.代码常规介绍

DataStream<T> output = dataStream
                 .connect(BroadcastStream)
                 .process(
                     // KeyedBroadcastProcessFunction 中的类型参数表示:
                     //   1. key stream 中的 key 类型
                     //   2. 非广播流中的元素类型
                     //   3. 广播流中的元素类型
                     //   4. 结果的类型,在这里是 string
                     new KeyedBroadcastProcessFunction<Ks, In1, In2, Out>() {
                         // 模式匹配逻辑
                     }
                 );

image.gif

常规使用中我们都包含一个数据流 DataStream,其中包含我们需要处理的数据,如果处理逻辑会随着一个状态值的改变而改变,这是可以引入第二个数据流成为广播流 BroadcastStream,通过调用 DataStream 的 connect 方法,并将 BroadcastStream 参数传入即可获得一个 BroadcastConnectedStream,这时数据同时包含数据流和状态流,需要重写 process 函数处理两个流的数据,根据 DataStream 是否是 Keyd-Stream,Process 方法分为:

· keyed 流,那就是 KeyedBroadcastProcessFunction 类型

· non-keyed 流,那就是 BroadcastProcessFunction 类型

在传入的 BroadcastProcessFunction 或 KeyedBroadcastProcessFunction 中,我们需要实现两个方法。processBroadcastElement() 方法负责处理广播流中的元素,processElement() 负责处理非广播流中的元素。 两个子类型定义如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

image.gif

需要注意的是 processBroadcastElement() 负责处理广播流的元素,而 processElement() 负责处理另一个流的元素。两个方法的第二个参数(Context)不同,均有以下方法:

得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)

查询元素的时间戳:ctx.timestamp()

查询目前的 Watermark:ctx.currentWatermark()

目前的处理时间 (processing time):ctx.currentProcessingTime()

产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)

三.应用实例

上面说的比较官方,下面通过一个简单的例子理解一下 BroadCast Value 和 BroadCast Stream 的用处,上面提到了 BroadCast Stream 作为一个状态流控制 DataStream 的数据输出,下面实现以下功能:

DataStream: 定期生成 num - 100+num 的 100 个数字,每次生成周期初始化 num + 100

BroadCastStream:不定期传入状态控制输出状态,分为 odd-单数 even-双数

Sink:根据 odd 和 even 的状态,print 输出 100 个数字中的单数或者双数

1.DataStream

5s 中生成 num - (num+100) 的数字,下一批数据比上一批增加 100,这里继承 RichSourceFunction 自定义 Source 来源然后通过 addSource 实现,完整的 DataStream Source 生成方法参考: Flink / Scala - DataSource 之 DataStream 获取数据总结

// 每5s生成一批数据 数据流
    case class InputData(num: Int)
    class SourceFromCollection extends RichSourceFunction[InputData] {
      private var isRunning = true
      var start = 0
      override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = {
        while ( {
          isRunning
        }) {
          (start to (start + 100)).foreach(num => {
            ctx.collect(InputData(num))
          })
          start += 100
          TimeUnit.SECONDS.sleep(5)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    val keyedStream = env.addSource(new SourceFromCollection()).setParallelism(1).keyBy(_.num)

image.gif

上述流根据 num- num+100 的数字生成 InputData 类,并通过 keyBy 生成 Keyd-Stream。

2.BroadCastStream

BroadCastStream 广播流即本例中的状态流,这里通过 File 传递状态值并解析,同样是继承 RichFunction 实现自定义的 Source,每 1s 从对应文件读取,获取是否有新的状态传入。

// MapStateDescriptor odd: 奇数 even: 偶数
    case class FilterState(state: String)    
    // 每s监控一次文件,并读取最新的状态
    class SourceFromFile extends RichSourceFunction[String] {
      private var isRunning = true
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        val bufferedReader = new BufferedReader(new FileReader("./data.txt"))
        while ( {
          isRunning
        }) {
          val line = bufferedReader.readLine
          if (!StringUtils.isBlank(line)) {
            ctx.collect(line)
          }
          TimeUnit.SECONDS.sleep(1)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState])
    // 广播流,广播规则并且创建 BroadCast
    val ruleStream = env.addSource(new SourceFromFile).setParallelism(1).map(new RichMapFunction[String, FilterState]() {
      override def map(in: String): FilterState = {
        FilterState(in)
      }
    }).broadcast(ruleStateDescriptor)

image.gif

stateDescriptor 负责声明广播状态的类型,这里定义为 MapStateDescriptor ,后续通过 String 类型的 key 即可获取对应的 FilterState,从而决定 DataStream 中的数据如何 sink。

3.合并 DataStream 与 BroadCastStream

DataStream.connect(BroadCastStream),由于原始 DataStream 为 keyd-stream,所以使用 keyedBroadcastProcessFunction,共包含四个参数:

· ks - keyBy 字段的类型,这里根据 InputData.num keyBy,所以是 Int

· IN1 - DataStream 数据流的类型,这里是 InputData

· IN2 - BroadCastStream 广播流的类型,这里是 FilterState

· OUT - Sink 输出端为直接输出 Print String,所以为 String

keyedStream.connect(ruleStream).process(new KeyedBroadcastProcessFunction[Int, InputData, FilterState, String] {
      // 与之前的 Descriptor 相同
      val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState])
      override def processElement(inputData: InputData, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#ReadOnlyContext, out: Collector[String]): Unit = {
        val filterStateClass = context.getBroadcastState(ruleStateDescriptor).get("broadcastStateKey")
        val filterState = if (filterStateClass == null) {
          "odd"
        } else {
          filterStateClass.state
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          out.collect(inputData.num.toString)
        }
      }
      override def processBroadcastElement(filterState: FilterState, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#Context, collector: Collector[String]): Unit = {
        // 从广播中获取规则
        val broadCastValue = context.getBroadcastState(ruleStateDescriptor)
        broadCastValue.put("broadcastStateKey", filterState)
        println(s"Rule Changed: ${filterState.state}")
      }
    }).setParallelism(1).print()

image.gif

A. ProcessElement

该方法负责输出数据,根据 FilterState 的状态是 odd-单数 还是 even-双数,状态默认为 odd-单数。通过 context.getBroadcastState(StateDescriptor) 方法获取 BroadcastStream 中的 FilterState 数据。注意这里的 StateDescriptor 要与上面初始化的 StateDescriptor 保持一致。

B. ProcessBroadcastElement

该方法负责处理 Broadcast 数据流并更新至 context,从而其他 task 节点在执行 processElement 方法时获取最新的状态值,这里 put 的 Key 和上述方法 get 的 Key 需要保持一致,否则获取状态值为 null。

4. 测试

为了本地测试方便查看,两个 Stream 的 parallelism 都设置为1。

状态文件 File 为空,此时默认状态为 odd,输出单数:

image.gif编辑

文件内增加一行 even,并 ctrl s 保存,此时 Broadcast 1s 的间隔检测到新状态 even,处理并更细至各 task,各 task 输出偶数:

image.gif编辑

再次增加一行 odd,此时输出状态改变,重新修改为输出单数:

image.gif编辑

一个基本的 BroadcastValue 控制 DataStream 的实例就完成了,状态文件夹最终包含两行状态数据:

image.gif编辑

5.完整代码

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.apache.commons.lang3.StringUtils
import java.io.BufferedReader
import java.io.FileReader
import java.util.concurrent.TimeUnit
object BroadCastStateDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 每5s生成一批数据 数据流
    case class InputData(num: Int)
    class SourceFromCollection extends RichSourceFunction[InputData] {
      private var isRunning = true
      var start = 0
      override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = {
        while ( {
          isRunning
        }) {
          (start to (start + 100)).foreach(num => {
            ctx.collect(InputData(num))
          })
          start += 100
          TimeUnit.SECONDS.sleep(5)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    val keyedStream = env.addSource(new SourceFromCollection()).setParallelism(1).keyBy(_.num)
    // 每s监控一次文件,并读取最新的状态
    class SourceFromFile extends RichSourceFunction[String] {
      private var isRunning = true
      override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
        val bufferedReader = new BufferedReader(new FileReader("/Users/xudong11/flink/src/main/scala/com.weibo.ug.push.flink/DataStreamingDemo/data.txt"))
        while ( {
          isRunning
        }) {
          val line = bufferedReader.readLine
          if (!StringUtils.isBlank(line)) {
            ctx.collect(line)
          }
          TimeUnit.SECONDS.sleep(1)
        }
      }
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    // MapStateDescriptor odd: 奇数 even: 偶数
    case class FilterState(state: String)
    val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState])
    // 广播流,广播规则并且创建 BroadCast
    val ruleStream = env.addSource(new SourceFromFile).setParallelism(1).map(new RichMapFunction[String, FilterState]() {
      override def map(in: String): FilterState = {
        FilterState(in)
      }
    }).broadcast(ruleStateDescriptor)
    // 连接两个流
    keyedStream.connect(ruleStream).process(new KeyedBroadcastProcessFunction[Int, InputData, FilterState, String] {
      // 与之前的 Descriptor 相同
      val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState])
      override def processElement(inputData: InputData, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#ReadOnlyContext, out: Collector[String]): Unit = {
        val filterStateClass = context.getBroadcastState(ruleStateDescriptor).get("broadcastStateKey")
        val filterState = if (filterStateClass == null) {
          "odd"
        } else {
          filterStateClass.state
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          out.collect(inputData.num.toString)
        }
      }
      override def processBroadcastElement(filterState: FilterState, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#Context, collector: Collector[String]): Unit = {
        // 从广播中获取规则
        val broadCastValue = context.getBroadcastState(ruleStateDescriptor)
        broadCastValue.put("broadcastStateKey", filterState)
        println(s"Rule Changed: ${filterState.state}")
      }
    }).setParallelism(1).print()
    env.execute()
  }
}

image.gif

四.总结

1.实现步骤

Broadcast Value 通过 DataStream connect BroadCastStream 连接实现,期间注意两个 ProcessFunction 的重写与对应 StateDescriptor 的定制即可。

2.数据一致性

其次需要注意两个 processFunction 的参数 ctx,在 processElement 中 ctr 是 readOnly,因为一致性的原因,这里只允许 task 读取最新的 State 但不能修改;相反的 processBroadcastElement 方法中的 context 允许修改其中 value 状态的值,注意这里的逻辑要保持全局的一致性(增加随机数随机修改状态值可视作是不保持全局唯一性的操作),否则会造成状态不同从而导致 task 端输出不一致。

3.CheckPoint

所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。 这个设计是为了防止在作业恢复后读文件造成的文件热点即 hotspot 。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态 / 改变并发的时候数据没有重复且没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(p_new - p_old)会使用轮询调度算法读取之前 task 的 state。

4.State Backend

broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State,因此不使用 RocksDB state backend。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1天前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4天前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
18 2
|
1天前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何将changelog转换为append模式
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
关系型数据库 MySQL Serverless
实时计算 Flink版产品使用问题之原生Session模式下遇到classpath路径未生效,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
NoSQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之应用程序模式的作业如何从检查点保存的地方启动
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错合集之向远端flink提交cdc模式作业时,连接池中的连接超时,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
SQL Java 数据处理
实时计算 Flink版产品使用问题之使用MavenShadePlugin进行relocation并遇到只包含了Java代码而未包含Scala代码,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
21天前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
580 7
阿里云实时计算Flink在多行业的应用和实践
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1天前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之怎么调整Flink Web U显示的日志行数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。