开发者社区> 问答> 正文

如何使用状态作为缓存

我想从状态中读取历史。如果state为null,则读取hbase并更新状态并使用onTimer设置状态ttl。问题是如何批量读取hbase,因为从hbase读取单个记录效率不高。

展开
收起
社区小助手 2018-12-11 16:13:15 1641 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    通常,如果要从Flink中的外部数据库缓存/镜像状态,最高效的方法是将数据库突变流式传输到Flink中 - 换句话说,将Flink转换为数据库更改数据捕获的复制端点(CDC) )stream,如果数据库支持那个。

    我没有使用hbase的经验,但https://github.com/mravi/hbase-connect-kafka是一个可能有用的例子(通过将kafka放在hbase和flink之间)。

    如果您希望从Flink查询hbase,并且希望避免一次为一个用户进行点查询,那么您可以构建如下内容:

              -> queryManyUsers -> keyBy(uId) -> 

    streamToEnrich CoProcessFunction

              -> keyBy(uID) ------------------->

    在这里,您将拆分流,通过窗口或流程函数或async i / o发送一个副本以批量查询hbase,并将结果发送到保存缓存并进行丰富的CoProcessFunction。

    当记录直接沿着底部路径到达此CoProcessFunction时,如果必要数据在缓存中,则使用它。否则,记录被缓冲,等待来自上部路径的高速缓存数据的到达。

    2019-07-17 23:19:50
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
用户态高速块缓存方案 立即下载
分布式高并发缓存6.0 立即下载
高性能Web架构之缓存体系 立即下载