一.引言
Flink 支持增加 DataStream KeyBy 之后 conncet BroadCastStream 形成 BroadConnectedStream,广播流内数据一般为不间断更新的上下文信息,在本例中,需要针对数据流中的用户信息,基于用于信息 + 广播流内的物料库实现推荐逻辑,针对 BroadConnectedStream 流,需要实现 KeyedBroadCastProcessFunction 完成用户流与广播流的处理,主要方法为:
ProcessElement - 根据用户流生成用户信息,根据物料库进行推荐
ProcessBroadcastElement - 获取物料库,并同步至 Context
编辑
由于任务启动时第一批物料库生成需要一定时间,而用户流则源源不断,从而导致物料库生成之前的来的用户都没有物料库进行推荐,为了保证不遗漏用户推荐,这里需要实现数据等待逻辑,让先到的用户流等待广播流的物料库生成完毕再进行推荐,从而保证不遗漏用户。
二.While True 尝试
一开始尝试带入离线的思维,既然物料库未生成无法完成推荐,则进行 while 判断和 TimeUnit 时间等待,重复判断物料库是否生成并造成线程阻塞,待物料库生成完毕再开始推荐,好处是保证不丢弃一个用户,坏处是前期需要线程堵塞,如果用户流数据过大则背压严重。
override def processElement(bs: BatchSendInfo, readOnlyContext: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#ReadOnlyContext, collector: Collector[SendInfo]): Unit = { materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB // 第一批造成堵塞,知道物料库生成 while (materialDataBase == null) { TimeUnit.SECONDS.sleep(60) materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB } val sendInfos = RankUtil.batchRank(bs.userObjects, materialDataBase) sendInfos.foreach(collector.collect) } override def processBroadcastElement(db: MaterialDataBase, context: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#Context, collector: Collector[SendInfo]): Unit = { val broadCastValue: BroadcastState[String, MaterialDataBase] = context.getBroadcastState(materialDBDescriptor) // 更新 DB if (db.isValid) { broadCastValue.put("MaterialDBContext", db) } }
BatchSendInfo 内存储一批待推荐的用户类,下述统称 UserObject,我的思路是 whilt true 检查物料库是否生成,未生成则等待 60s 再重新从 readOnlyContext 上下文中获取,待物料库不为 null 时执行 BatchRank 的批量排序逻辑,看上去很美好,但是实践后得到的是死循环。
原因分析:
对于当前处理的 bs: BatchSendInfo,其 context 在 processBroadcastElement 后已经不再更新,我理想化的情况是等到新的 MaterialDataBase 传输后在这里更新 context,但是由于 context 在当前 processFunction 内不再更新,所以我的 while true 是死循环,所以这个方案 pass,这个方案只能适用于 MaterialDataBase 在另外线程生成并能更新到当前线程的场景。
三.ValueState 缓存尝试 👍
还有另外一种方法,就是当物料库不可用时,将先到的数据存到 ValueState 中并设置延时处理,延时时长可以设定为物料库初始化时间左右,待 onTimer 时判断物料库状态,如果物料库初始化成功则执行推荐逻辑,未成功则继续存储至 ValueState,其实本质上和 While True 类似,只不过变成一直存储了,缺点是如果前期数据过多会造成缓存量较大,不过可以通过加大 Heap 或者采用 RocksDB 轻松解决。
override def processElement(bs: BatchSendInfo, readOnlyContext: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#ReadOnlyContext, collector: Collector[SendInfo]): Unit = { materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB val lastBatchUserObject = state.value val combineBS = if (lastBatchUserObject == null) { bs } else { val allUser = new ArrayBuffer[DpaUserObject]() allUser ++= bs.userObjects allUser ++= lastBatchUserObject.userObjects BatchSendInfo(allUser.toArray, readOnlyContext.getCurrentKey) } if (materialDataBase == null) { // 物料库不可用 readOnlyContext.timerService.registerEventTimeTimer(System.currentTimeMillis() + expireTime) state.update(combineBS) } else { // 物料库可用 val sendInfos = RankUtil.batchRank(combineBS.userObjects, materialDataBase) sendInfos.foreach(sendInfo => { collector.collect(sendInfo) }) } } override def processBroadcastElement(db: MaterialDataBase, context: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#Context, collector: Collector[SendInfo]): Unit = { val broadCastValue: BroadcastState[String, MaterialDataBase] = context.getBroadcastState(materialDBDescriptor) // 更新 DB if (db.isValid) { broadCastValue.put("MaterialDBContext", db) } }
ProcessBroadcastElement 方法未改变,只是修改了 ProcessElement 方法:
A.lastBatchUserObject 判断当前 key 是否存在已经缓存的批用户
B.CombineBS 用户合并当前 key 需要处理的用户批
C.如果物料库为 null,则将当前批用户存入 ValueState 并设置 expire 过期时间,这个时间可以基于你物料库生成时间,例如物料库正常情况下50s生成,则设置60s过期,保证到期后物料库可用,不需要持续缓存
D.如果物料库已经可用则直接执行 BatchRank 推荐逻辑
所以这里主要就两件事,合并批用户,判断物料库状态决定批用户是存储还是计算。
除了 Process 函数,还包含 onTimer 函数:
override def onTimer(timestamp: Long, ctx: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#OnTimerContext, out: Collector[SendInfo]): Unit = { val batchBS = state.value() materialDataBase = ctx.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB if (!batchBS.equals(null) && !materialDataBase.equals(null)) { // 物料库可用,批量下发 val sendInfos = RankUtil.batchRank(batchBS.userObjects, materialDataBase) sendInfos.foreach(sendInfo => { out.collect(sendInfo) }) } else { // 清除状态 state.clear() } }
onTimer 单独处理到期的批用户,这里重新获取 materialDataBase,如果批用户和物料库都不为 null 则执行批推荐逻辑,否则清理批用户 state.clear(),我这里会损失数据,如果不想损失数据则将 else 逻辑修改为与 ProcessElement 一致,如果物料库经过 expireTime 还未成功,则继续缓存数据,直到下一个 expireTime 周期,循环往复:
context.timerService.registerEventTimeTimer(System.currentTimeMillis() + expireTime) state.update(combineBS)
四.总结
上述代码中的 BatchSendInfo 可以看做是自己的 Source 类,MaterialDataBase 可以看做是自己的广播流上下文,面对需要等到广播流初始化完毕的需求则修改上述对应代码即可,expireTime 则根据广播流变量初始化时间进行设定,缓存方法本地测试缓存159批数据,到期处理159批数据,延迟和存储要求都不高,非常的奈斯~