Flink / Scala - ProcessFunction 之间共用缓存测试

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: Flink 开发中有如下场景,数据需要经过两次 ProcessFunction 处理,第一步 ProcessV1的一些信息重复不想通过每条数据传输至 ProcessV2,这时便捷的方法时对 ProcessV1 需要存储的元素进行去重缓存,保证全局共用一份缓存,可以有效减少储存空间,下面分别尝试三种缓存方式:A.ValueState 缓存B.HashMapCache 缓存C.RedisCahce 缓存...

一.引言

Flink 开发中有如下场景,数据需要经过两次 ProcessFunction 处理,第一步 ProcessV1 的一些信息重复不想通过每条数据传输至 ProcessV2,这时便捷的方法时对 ProcessV1 需要存储的元素进行去重缓存,保证全局共用一份缓存,可以有效减少储存空间,下面分别尝试三种缓存方式:

A.ValueState 缓存

B.HashMapCache 缓存

C.RedisCahce 缓存

image.gif编辑

Tips:

后续测试均基于下述自定义 Source,该 Source 周期性产生一批数字并生成 InputData 类:

import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import java.util.concurrent.TimeUnit
class SourceFromCollection extends RichSourceFunction[InputData] {
  private var isRunning = true
  var start = 0
  override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = {
    while ( {
      isRunning
    }) {
      (start to (start + 100)).foreach(num => {
        ctx.collect(InputData(num))
      })
      start += 100
      TimeUnit.SECONDS.sleep(5)
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

image.gif

InputData 类很简单,只包含对应数字一个属性:

case class InputData(num: Int)

image.gif

二.ValueState 缓存

ValueState 有状态算子之前已经分析过 Flink 有状态算子和应用Demo详解,其可以针对 keyedStream 在 ProcessFunction 函数内存储状态,并在相同 key 触达时获取缓存的状态并决定更新或者舍弃,基于这个特性,下面尝试一下在 ProcessV1 中缓存 key 的状态,在 ProcessV2 中看看能否通过相同 key 获取状态,如果实现则实现了同一 key 内容的缓存并实现去重。

1.ProcessV1 && ProcessV2

ProcessV1 函数负责将 InputData 的 num 保存至 ValueState[String] 中,同时 ProcessV1 只负责传递信息,所以 inputData 作为输入存储信息后又通过 collector 发出到 ProcessV2 中。

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
class ProcessV1 extends ProcessFunction[InputData ,InputData] {
  var numValueState: ValueState[String] = _
  // 获取状态值
  override def open(parameters: Configuration): Unit = {
    numValueState = getRuntimeContext.getState(new ValueStateDescriptor[String]("NumInfo", classOf[String]))
  }
  // 更新状态
  override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = {
    val numInfo = numValueState.value()
    if (numInfo == null) {
      numValueState.update(s"${input.num} Save By ProcessV1")
    }
    collector.collect(input)
  }
}

image.gif

ProcessV2 函数负责根据 InputData 的 num key 获取 ValueState[String] ,如果 ValueState.value() 不为 null 则代表缓存成功,同一个 key 的状态可以在两个 ProcessFunction 之间传递,缓存成功,反之则不可以。

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
class ProcessV2 extends ProcessFunction[InputData ,String] {
  var numValueState: ValueState[String] = _
  // 获取状态值
  override def open(parameters: Configuration): Unit = {
    numValueState = getRuntimeContext.getState(new ValueStateDescriptor[String]("NumInfo", classOf[String]))
  }
  // 更新状态
  override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = {
    val numInfo = numValueState.value()
    val stateInfo = if (numInfo == null) {
      s"${input.num} No Cache Info"
    } else {
      numInfo
    }
    collector.collect(stateInfo)
  }
}

image.gif

2.main 主函数

为了保证同一 Key 的访问,所以都使用 _.num 作为 keyBy 的依据,最后将结果是否 cache 打印:

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object CacheByValueState {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new SourceFromCollection)
      .setParallelism(1)
      .keyBy(_.num)
      .process(new ProcessV1())
      .keyBy(_.num)
      .process(new ProcessV2())
      .print()
    env.execute()
  }
}

image.gif

经过测试,通过 ValueState 在两个 ProcessFunction 之间传递的缓存方式宣告失败:

image.gif编辑

三.HashMap 缓存

上面方式不通后,想到了最简单的去重缓存方式: HashMap,直接在构造 ProcessFunction 时候将 HashMap 作为参数传入。

1.HashMapCacheV1 && HashMapCacheV2

HashMapCacheV1 函数与上面实现细节相同,数据 input 进入并通过 collector 传递,只负责在该阶段将 num 计入 HashMap 缓存内,为了保证线程安全,这里使用 ConcurrentHashMap 代替 HashMap。

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import java.util.concurrent.ConcurrentHashMap
class HashMapCacheV1(cache: ConcurrentHashMap[Int, String]) extends ProcessFunction[InputData ,InputData] {
  // 添加至缓存
  override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = {
    if (!cache.contains(input.num)) {
      cache.put(input.num, s"${input.num} 计入缓存!")
    }
    println("V1: " + cache)
    collector.collect(input)
  }
}

image.gif

直接判断 num 是否存在于 ConcurrentHashMap 中,并将结果输出,为了可视化 cache,这里 V1、V2 函数都对 HashMap 进行了 print 的打印。

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import java.util.concurrent.ConcurrentHashMap
class HashMapCacheV2(cache: ConcurrentHashMap[Int, String]) extends ProcessFunction[InputData ,String] {
  // 获取缓存
  override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = {
    println("V2: " + cache)
    val output = cache.getOrDefault(input.num, s"${input.num} Not In Cache!")
    collector.collect(output)
  }

image.gif

2.main 主函数

同样为了保证同一个 key,所以都使用 _.num 作为 keyBy 的依据,最后将结果是否 cache 打印,这里构造 Map 后直接当做传参数传给 ProcessFunction。

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.concurrent.ConcurrentHashMap
object CacheByHashMap {
  def main(args: Array[String]): Unit = {
    val hashMapCache = new ConcurrentHashMap[Int, String]()
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new SourceFromCollection)
      .setParallelism(1)
      .keyBy(_.num)
      .process(new HashMapCacheV1(hashMapCache))
      .keyBy(_.num)
      .process(new HashMapCacheV2(hashMapCache))
      .print()
    env.execute()
  }
}

image.gif

经过测试,通过 HashMap 缓存的方法宣告失败,可以看到虽然传给 V1 和 V2 函数的是同一个 HashMap 且同步,但在实际工作中两个 Cahe 似乎是隔离的,cache 可以在 V1 阶段缓存,但 V2 阶段 cache 则完全为空,故 HashMap 不能实现两个 ProcessFunction 之间的缓存。

image.gif编辑

四.Redis 缓存 👍

经过上面两个缓存尝试失败后,这里直接上 Redis 缓存实现同步 cache,这里需要注意 redis 不能像上面 HashMap 一样直接传给 ProcessFunction:

image.gif编辑

因为涉及到序列化的问题:

image.gif编辑

Flink 序列化的问题之前也写过: Flink 序列化问题,遇到这些无法序列的话类可以通过 ProcessFunction 的 open 方法进行初始化,下面通过 open 方法初始化 redis 并尝试实现 cache 缓存。

1.RedisCacheV1 && RedisCacheV2

V1 函数负责传递 InputData 同时通过 redis set 完成去重与缓存。

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis
class RedisCacheV1(cache: Jedis) extends ProcessFunction[InputData ,InputData] {
  val setKey = "redisCacheTestKey"
  // 添加至缓存
  override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = {
    // 不存在添加至集合
    if (!cache.sismember(setKey, input.num.toString)) {
      cache.sadd(setKey, input.num.toString)
    }
    collector.collect(input)
  }
}

image.gif

V2 函数负责判断元素是否存在并向下游传递打印日志。

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis
class RedisCacheV2(cache: Jedis) extends ProcessFunction[InputData ,String] {
  val setKey = "redisCacheTestKey"
  // 添加至缓存
  override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = {
    if (cache.sismember(setKey, input.num.toString)) {
      collector.collect(input + " is in Cache")
    } else {
      collector.collect(input + " is not in Cache")
    }
  }
}

image.gif

2.main 主函数

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object CacheByRedisV2 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new SourceFromCollection)
      .setParallelism(1)
      .keyBy(_.num)
      .process(new OpenRedisCacheV1())
      .keyBy(_.num)
      .process(new OpenRedisCacheV2())
      .print()
    env.execute()
  }
}

image.gif

测试成功了,通过 Redis 能实现 flink 不同 ProcessFunction 之间的全局缓存。

image.gif编辑

五.总结

上面总共实现了三种缓存方式:

A.ValueState 缓存 ❌

B.HashMapCache 缓存 ❌

C.RedisCahce 缓存 成功 ✔️

ValueState 失败是因为两个 ProcessFunction 的 RuntimeContext 上下文不同,所以获取相同 key 的缓存时出现差异,这个也比较好理解,如果两个位置用户做不同类型的缓存,而多个 ProcessFunction 公用相同 key,也会发生很多问题。HashMapCache 一是因为处于不同 RuntimeContext,其次每个 HashMap 所处的 task 可能不共享一份内存,这里有点类似 Spark 中的 partition,每个缓存 HashMap 只能存储到对应 Task 下的数据,而不能坐到全局缓存。综上如果想在 ProcessFunction 之间做全局缓存,除了 Redis,还可以选择 MongoDB、Hbase、ES 等数据库,具体可以根据场景和 QPS 作区分,同时需要注意数据库同步需要时间,注意一些异常值以及边缘条件的判断。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1天前
|
存储 监控 API
Flink的每个key状态 和每个并行度的状态 怎么测试区分?
Flink的每个key状态 和每个并行度的状态 怎么测试区分?
39 0
|
1天前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
294 3
|
1天前
|
监控 关系型数据库 MySQL
Flink CDC产品常见问题之使用3.0测试mysql到starrocks启动报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1天前
|
存储 API 流计算
要测试和区分Flink的每个key状态和每个并行度的状态
【2月更文挑战第23天】要测试和区分Flink的每个key状态和每个并行度的状态
14 1
|
6月前
|
SQL 存储 缓存
Flink CDC中flink sql 如果缓存起来所有的数据,然后基于这个数据做查询?
Flink CDC中flink sql 如果缓存起来所有的数据,然后基于这个数据做查询?
70 1
|
7月前
|
缓存 Java 流计算
这个问题可能是由于Flink的库缓存导致的
这个问题可能是由于Flink的库缓存导致的
23 3
|
Java Apache 流计算
|
NoSQL 网络安全 Redis
flink测试redis sink报错
flink测试redis sink报错
|
监控 大数据 流计算
环境篇之 flink 的集群测试|学习笔记
快速学习环境篇之 flink 的集群测试
232 0
环境篇之 flink 的集群测试|学习笔记
|
大数据 流计算 开发者
环境篇之 flink 的 historyserver 的测试|学习笔记
快速学习环境篇之 flink 的 historyserver 的测试
380 0
环境篇之 flink 的 historyserver 的测试|学习笔记