Flink / Scala - ProcessFunction 之间共用缓存测试

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
实时计算 Flink 版,5000CU*H 3个月
云数据库 Tair(兼容Redis),内存型 2GB
简介: Flink 开发中有如下场景,数据需要经过两次 ProcessFunction 处理,第一步 ProcessV1的一些信息重复不想通过每条数据传输至 ProcessV2,这时便捷的方法时对 ProcessV1 需要存储的元素进行去重缓存,保证全局共用一份缓存,可以有效减少储存空间,下面分别尝试三种缓存方式:A.ValueState 缓存B.HashMapCache 缓存C.RedisCahce 缓存...

一.引言

Flink 开发中有如下场景,数据需要经过两次 ProcessFunction 处理,第一步 ProcessV1 的一些信息重复不想通过每条数据传输至 ProcessV2,这时便捷的方法时对 ProcessV1 需要存储的元素进行去重缓存,保证全局共用一份缓存,可以有效减少储存空间,下面分别尝试三种缓存方式:

A.ValueState 缓存

B.HashMapCache 缓存

C.RedisCahce 缓存

image.gif编辑

Tips:

后续测试均基于下述自定义 Source,该 Source 周期性产生一批数字并生成 InputData 类:

import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import java.util.concurrent.TimeUnit
class SourceFromCollection extends RichSourceFunction[InputData] {
  private var isRunning = true
  var start = 0
  override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = {
    while ( {
      isRunning
    }) {
      (start to (start + 100)).foreach(num => {
        ctx.collect(InputData(num))
      })
      start += 100
      TimeUnit.SECONDS.sleep(5)
    }
  }
  override def cancel(): Unit = {
    isRunning = false
  }
}

image.gif

InputData 类很简单,只包含对应数字一个属性:

case class InputData(num: Int)

image.gif

二.ValueState 缓存

ValueState 有状态算子之前已经分析过 Flink 有状态算子和应用Demo详解,其可以针对 keyedStream 在 ProcessFunction 函数内存储状态,并在相同 key 触达时获取缓存的状态并决定更新或者舍弃,基于这个特性,下面尝试一下在 ProcessV1 中缓存 key 的状态,在 ProcessV2 中看看能否通过相同 key 获取状态,如果实现则实现了同一 key 内容的缓存并实现去重。

1.ProcessV1 && ProcessV2

ProcessV1 函数负责将 InputData 的 num 保存至 ValueState[String] 中,同时 ProcessV1 只负责传递信息,所以 inputData 作为输入存储信息后又通过 collector 发出到 ProcessV2 中。

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
class ProcessV1 extends ProcessFunction[InputData ,InputData] {
  var numValueState: ValueState[String] = _
  // 获取状态值
  override def open(parameters: Configuration): Unit = {
    numValueState = getRuntimeContext.getState(new ValueStateDescriptor[String]("NumInfo", classOf[String]))
  }
  // 更新状态
  override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = {
    val numInfo = numValueState.value()
    if (numInfo == null) {
      numValueState.update(s"${input.num} Save By ProcessV1")
    }
    collector.collect(input)
  }
}

image.gif

ProcessV2 函数负责根据 InputData 的 num key 获取 ValueState[String] ,如果 ValueState.value() 不为 null 则代表缓存成功,同一个 key 的状态可以在两个 ProcessFunction 之间传递,缓存成功,反之则不可以。

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
class ProcessV2 extends ProcessFunction[InputData ,String] {
  var numValueState: ValueState[String] = _
  // 获取状态值
  override def open(parameters: Configuration): Unit = {
    numValueState = getRuntimeContext.getState(new ValueStateDescriptor[String]("NumInfo", classOf[String]))
  }
  // 更新状态
  override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = {
    val numInfo = numValueState.value()
    val stateInfo = if (numInfo == null) {
      s"${input.num} No Cache Info"
    } else {
      numInfo
    }
    collector.collect(stateInfo)
  }
}

image.gif

2.main 主函数

为了保证同一 Key 的访问,所以都使用 _.num 作为 keyBy 的依据,最后将结果是否 cache 打印:

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object CacheByValueState {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new SourceFromCollection)
      .setParallelism(1)
      .keyBy(_.num)
      .process(new ProcessV1())
      .keyBy(_.num)
      .process(new ProcessV2())
      .print()
    env.execute()
  }
}

image.gif

经过测试,通过 ValueState 在两个 ProcessFunction 之间传递的缓存方式宣告失败:

image.gif编辑

三.HashMap 缓存

上面方式不通后,想到了最简单的去重缓存方式: HashMap,直接在构造 ProcessFunction 时候将 HashMap 作为参数传入。

1.HashMapCacheV1 && HashMapCacheV2

HashMapCacheV1 函数与上面实现细节相同,数据 input 进入并通过 collector 传递,只负责在该阶段将 num 计入 HashMap 缓存内,为了保证线程安全,这里使用 ConcurrentHashMap 代替 HashMap。

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import java.util.concurrent.ConcurrentHashMap
class HashMapCacheV1(cache: ConcurrentHashMap[Int, String]) extends ProcessFunction[InputData ,InputData] {
  // 添加至缓存
  override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = {
    if (!cache.contains(input.num)) {
      cache.put(input.num, s"${input.num} 计入缓存!")
    }
    println("V1: " + cache)
    collector.collect(input)
  }
}

image.gif

直接判断 num 是否存在于 ConcurrentHashMap 中,并将结果输出,为了可视化 cache,这里 V1、V2 函数都对 HashMap 进行了 print 的打印。

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import java.util.concurrent.ConcurrentHashMap
class HashMapCacheV2(cache: ConcurrentHashMap[Int, String]) extends ProcessFunction[InputData ,String] {
  // 获取缓存
  override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = {
    println("V2: " + cache)
    val output = cache.getOrDefault(input.num, s"${input.num} Not In Cache!")
    collector.collect(output)
  }

image.gif

2.main 主函数

同样为了保证同一个 key,所以都使用 _.num 作为 keyBy 的依据,最后将结果是否 cache 打印,这里构造 Map 后直接当做传参数传给 ProcessFunction。

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.concurrent.ConcurrentHashMap
object CacheByHashMap {
  def main(args: Array[String]): Unit = {
    val hashMapCache = new ConcurrentHashMap[Int, String]()
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new SourceFromCollection)
      .setParallelism(1)
      .keyBy(_.num)
      .process(new HashMapCacheV1(hashMapCache))
      .keyBy(_.num)
      .process(new HashMapCacheV2(hashMapCache))
      .print()
    env.execute()
  }
}

image.gif

经过测试,通过 HashMap 缓存的方法宣告失败,可以看到虽然传给 V1 和 V2 函数的是同一个 HashMap 且同步,但在实际工作中两个 Cahe 似乎是隔离的,cache 可以在 V1 阶段缓存,但 V2 阶段 cache 则完全为空,故 HashMap 不能实现两个 ProcessFunction 之间的缓存。

image.gif编辑

四.Redis 缓存 👍

经过上面两个缓存尝试失败后,这里直接上 Redis 缓存实现同步 cache,这里需要注意 redis 不能像上面 HashMap 一样直接传给 ProcessFunction:

image.gif编辑

因为涉及到序列化的问题:

image.gif编辑

Flink 序列化的问题之前也写过: Flink 序列化问题,遇到这些无法序列的话类可以通过 ProcessFunction 的 open 方法进行初始化,下面通过 open 方法初始化 redis 并尝试实现 cache 缓存。

1.RedisCacheV1 && RedisCacheV2

V1 函数负责传递 InputData 同时通过 redis set 完成去重与缓存。

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis
class RedisCacheV1(cache: Jedis) extends ProcessFunction[InputData ,InputData] {
  val setKey = "redisCacheTestKey"
  // 添加至缓存
  override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = {
    // 不存在添加至集合
    if (!cache.sismember(setKey, input.num.toString)) {
      cache.sadd(setKey, input.num.toString)
    }
    collector.collect(input)
  }
}

image.gif

V2 函数负责判断元素是否存在并向下游传递打印日志。

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis
class RedisCacheV2(cache: Jedis) extends ProcessFunction[InputData ,String] {
  val setKey = "redisCacheTestKey"
  // 添加至缓存
  override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = {
    if (cache.sismember(setKey, input.num.toString)) {
      collector.collect(input + " is in Cache")
    } else {
      collector.collect(input + " is not in Cache")
    }
  }
}

image.gif

2.main 主函数

import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object CacheByRedisV2 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.addSource(new SourceFromCollection)
      .setParallelism(1)
      .keyBy(_.num)
      .process(new OpenRedisCacheV1())
      .keyBy(_.num)
      .process(new OpenRedisCacheV2())
      .print()
    env.execute()
  }
}

image.gif

测试成功了,通过 Redis 能实现 flink 不同 ProcessFunction 之间的全局缓存。

image.gif编辑

五.总结

上面总共实现了三种缓存方式:

A.ValueState 缓存 ❌

B.HashMapCache 缓存 ❌

C.RedisCahce 缓存 成功 ✔️

ValueState 失败是因为两个 ProcessFunction 的 RuntimeContext 上下文不同,所以获取相同 key 的缓存时出现差异,这个也比较好理解,如果两个位置用户做不同类型的缓存,而多个 ProcessFunction 公用相同 key,也会发生很多问题。HashMapCache 一是因为处于不同 RuntimeContext,其次每个 HashMap 所处的 task 可能不共享一份内存,这里有点类似 Spark 中的 partition,每个缓存 HashMap 只能存储到对应 Task 下的数据,而不能坐到全局缓存。综上如果想在 ProcessFunction 之间做全局缓存,除了 Redis,还可以选择 MongoDB、Hbase、ES 等数据库,具体可以根据场景和 QPS 作区分,同时需要注意数据库同步需要时间,注意一些异常值以及边缘条件的判断。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
202 0
|
3月前
|
Java 流计算
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
58 1
Flink-03 Flink Java 3分钟上手 Stream 给 Flink-02 DataStreamSource Socket写一个测试的工具!
|
3月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
73 3
|
3月前
|
分布式计算 Java 大数据
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
大数据-122 - Flink Time Watermark Java代码测试实现Tumbling Window
52 0
|
5月前
|
缓存 NoSQL 测试技术
【Azure Redis 缓存 Azure Cache For Redis】使用Redis自带redis-benchmark.exe命令测试Azure Redis的性能
【Azure Redis 缓存 Azure Cache For Redis】使用Redis自带redis-benchmark.exe命令测试Azure Redis的性能
|
5月前
|
缓存 NoSQL 网络协议
【Azure Redis 缓存 Azure Cache For Redis】在创建高级层Redis(P1)集成虚拟网络(VNET)后,如何测试VNET中资源如何成功访问及配置白名单的效果
【Azure Redis 缓存 Azure Cache For Redis】在创建高级层Redis(P1)集成虚拟网络(VNET)后,如何测试VNET中资源如何成功访问及配置白名单的效果
|
5月前
|
SQL 缓存 监控
实时计算 Flink版产品使用问题之怎么手动清理缓存或废弃文件
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
缓存 关系型数据库 MySQL
实时计算 Flink版产品使用问题之缓存内存占用较大一般是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL Java 数据处理
实时计算 Flink版产品使用问题之使用MavenShadePlugin进行relocation并遇到只包含了Java代码而未包含Scala代码,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之测试使用initial模式,使用savepoint停掉再加上表,不会做全量同步,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。