一.引言
Flink 开发中有如下场景,数据需要经过两次 ProcessFunction 处理,第一步 ProcessV1 的一些信息重复不想通过每条数据传输至 ProcessV2,这时便捷的方法时对 ProcessV1 需要存储的元素进行去重缓存,保证全局共用一份缓存,可以有效减少储存空间,下面分别尝试三种缓存方式:
A.ValueState 缓存
B.HashMapCache 缓存
C.RedisCahce 缓存
编辑
Tips:
后续测试均基于下述自定义 Source,该 Source 周期性产生一批数字并生成 InputData 类:
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import java.util.concurrent.TimeUnit class SourceFromCollection extends RichSourceFunction[InputData] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = { while ( { isRunning }) { (start to (start + 100)).foreach(num => { ctx.collect(InputData(num)) }) start += 100 TimeUnit.SECONDS.sleep(5) } } override def cancel(): Unit = { isRunning = false } }
InputData 类很简单,只包含对应数字一个属性:
case class InputData(num: Int)
二.ValueState 缓存
ValueState 有状态算子之前已经分析过 Flink 有状态算子和应用Demo详解,其可以针对 keyedStream 在 ProcessFunction 函数内存储状态,并在相同 key 触达时获取缓存的状态并决定更新或者舍弃,基于这个特性,下面尝试一下在 ProcessV1 中缓存 key 的状态,在 ProcessV2 中看看能否通过相同 key 获取状态,如果实现则实现了同一 key 内容的缓存并实现去重。
1.ProcessV1 && ProcessV2
ProcessV1 函数负责将 InputData 的 num 保存至 ValueState[String] 中,同时 ProcessV1 只负责传递信息,所以 inputData 作为输入存储信息后又通过 collector 发出到 ProcessV2 中。
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector class ProcessV1 extends ProcessFunction[InputData ,InputData] { var numValueState: ValueState[String] = _ // 获取状态值 override def open(parameters: Configuration): Unit = { numValueState = getRuntimeContext.getState(new ValueStateDescriptor[String]("NumInfo", classOf[String])) } // 更新状态 override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = { val numInfo = numValueState.value() if (numInfo == null) { numValueState.update(s"${input.num} Save By ProcessV1") } collector.collect(input) } }
ProcessV2 函数负责根据 InputData 的 num key 获取 ValueState[String] ,如果 ValueState.value() 不为 null 则代表缓存成功,同一个 key 的状态可以在两个 ProcessFunction 之间传递,缓存成功,反之则不可以。
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector class ProcessV2 extends ProcessFunction[InputData ,String] { var numValueState: ValueState[String] = _ // 获取状态值 override def open(parameters: Configuration): Unit = { numValueState = getRuntimeContext.getState(new ValueStateDescriptor[String]("NumInfo", classOf[String])) } // 更新状态 override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = { val numInfo = numValueState.value() val stateInfo = if (numInfo == null) { s"${input.num} No Cache Info" } else { numInfo } collector.collect(stateInfo) } }
2.main 主函数
为了保证同一 Key 的访问,所以都使用 _.num 作为 keyBy 的依据,最后将结果是否 cache 打印:
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object CacheByValueState { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.addSource(new SourceFromCollection) .setParallelism(1) .keyBy(_.num) .process(new ProcessV1()) .keyBy(_.num) .process(new ProcessV2()) .print() env.execute() } }
经过测试,通过 ValueState 在两个 ProcessFunction 之间传递的缓存方式宣告失败:
编辑
三.HashMap 缓存
上面方式不通后,想到了最简单的去重缓存方式: HashMap,直接在构造 ProcessFunction 时候将 HashMap 作为参数传入。
1.HashMapCacheV1 && HashMapCacheV2
HashMapCacheV1 函数与上面实现细节相同,数据 input 进入并通过 collector 传递,只负责在该阶段将 num 计入 HashMap 缓存内,为了保证线程安全,这里使用 ConcurrentHashMap 代替 HashMap。
import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector import java.util.concurrent.ConcurrentHashMap class HashMapCacheV1(cache: ConcurrentHashMap[Int, String]) extends ProcessFunction[InputData ,InputData] { // 添加至缓存 override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = { if (!cache.contains(input.num)) { cache.put(input.num, s"${input.num} 计入缓存!") } println("V1: " + cache) collector.collect(input) } }
直接判断 num 是否存在于 ConcurrentHashMap 中,并将结果输出,为了可视化 cache,这里 V1、V2 函数都对 HashMap 进行了 print 的打印。
import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector import java.util.concurrent.ConcurrentHashMap class HashMapCacheV2(cache: ConcurrentHashMap[Int, String]) extends ProcessFunction[InputData ,String] { // 获取缓存 override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = { println("V2: " + cache) val output = cache.getOrDefault(input.num, s"${input.num} Not In Cache!") collector.collect(output) }
2.main 主函数
同样为了保证同一个 key,所以都使用 _.num 作为 keyBy 的依据,最后将结果是否 cache 打印,这里构造 Map 后直接当做传参数传给 ProcessFunction。
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import java.util.concurrent.ConcurrentHashMap object CacheByHashMap { def main(args: Array[String]): Unit = { val hashMapCache = new ConcurrentHashMap[Int, String]() val env = StreamExecutionEnvironment.getExecutionEnvironment env.addSource(new SourceFromCollection) .setParallelism(1) .keyBy(_.num) .process(new HashMapCacheV1(hashMapCache)) .keyBy(_.num) .process(new HashMapCacheV2(hashMapCache)) .print() env.execute() } }
经过测试,通过 HashMap 缓存的方法宣告失败,可以看到虽然传给 V1 和 V2 函数的是同一个 HashMap 且同步,但在实际工作中两个 Cahe 似乎是隔离的,cache 可以在 V1 阶段缓存,但 V2 阶段 cache 则完全为空,故 HashMap 不能实现两个 ProcessFunction 之间的缓存。
编辑
四.Redis 缓存 👍
经过上面两个缓存尝试失败后,这里直接上 Redis 缓存实现同步 cache,这里需要注意 redis 不能像上面 HashMap 一样直接传给 ProcessFunction:
编辑
因为涉及到序列化的问题:
编辑
Flink 序列化的问题之前也写过: Flink 序列化问题,遇到这些无法序列的话类可以通过 ProcessFunction 的 open 方法进行初始化,下面通过 open 方法初始化 redis 并尝试实现 cache 缓存。
1.RedisCacheV1 && RedisCacheV2
V1 函数负责传递 InputData 同时通过 redis set 完成去重与缓存。
import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector import redis.clients.jedis.Jedis class RedisCacheV1(cache: Jedis) extends ProcessFunction[InputData ,InputData] { val setKey = "redisCacheTestKey" // 添加至缓存 override def processElement(input: InputData, context: ProcessFunction[InputData, InputData]#Context, collector: Collector[InputData]): Unit = { // 不存在添加至集合 if (!cache.sismember(setKey, input.num.toString)) { cache.sadd(setKey, input.num.toString) } collector.collect(input) } }
V2 函数负责判断元素是否存在并向下游传递打印日志。
import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector import redis.clients.jedis.Jedis class RedisCacheV2(cache: Jedis) extends ProcessFunction[InputData ,String] { val setKey = "redisCacheTestKey" // 添加至缓存 override def processElement(input: InputData, context: ProcessFunction[InputData, String]#Context, collector: Collector[String]): Unit = { if (cache.sismember(setKey, input.num.toString)) { collector.collect(input + " is in Cache") } else { collector.collect(input + " is not in Cache") } } }
2.main 主函数
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object CacheByRedisV2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.addSource(new SourceFromCollection) .setParallelism(1) .keyBy(_.num) .process(new OpenRedisCacheV1()) .keyBy(_.num) .process(new OpenRedisCacheV2()) .print() env.execute() } }
测试成功了,通过 Redis 能实现 flink 不同 ProcessFunction 之间的全局缓存。
编辑
五.总结
上面总共实现了三种缓存方式:
A.ValueState 缓存 ❌
B.HashMapCache 缓存 ❌
C.RedisCahce 缓存 成功 ✔️
ValueState 失败是因为两个 ProcessFunction 的 RuntimeContext 上下文不同,所以获取相同 key 的缓存时出现差异,这个也比较好理解,如果两个位置用户做不同类型的缓存,而多个 ProcessFunction 公用相同 key,也会发生很多问题。HashMapCache 一是因为处于不同 RuntimeContext,其次每个 HashMap 所处的 task 可能不共享一份内存,这里有点类似 Spark 中的 partition,每个缓存 HashMap 只能存储到对应 Task 下的数据,而不能坐到全局缓存。综上如果想在 ProcessFunction 之间做全局缓存,除了 Redis,还可以选择 MongoDB、Hbase、ES 等数据库,具体可以根据场景和 QPS 作区分,同时需要注意数据库同步需要时间,注意一些异常值以及边缘条件的判断。