开发者社区> 问答> 正文

在Apache Flink中手动更新状态的最佳方法是什么?

flink小助手 2018-12-13 14:19:42 425

我在股票市场项目中使用Apache Flink来计算当前的价格变化。公式是

price_change = (current_price - previous_close_price) / previous_close_price
previous_close_price是交易所前一天的证券收盘价。在市场开放前的每一天,我都需要更新previous_close_price。

现在我想出了几个解决方案,但我不知道哪个是最好的。

存储previous_close_price在redis中并在每次计算中获取价格。更新价格既简单又灵活,但此解决方案可能会降低性能。

将状态的TTL设置为1天。旧状态到期时获取新状态。但它不灵活,因为TTL是硬编码的。

广播状态模式。我不确定这个解决方案是否有效。

发送特殊消息给flink。当flink收到消息时,它会更新previous_close_price。

存储 NoSQL Apache Redis 流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:20:37

    建议在#4上使用一个变体:

    有两个来源,一个仅用于收盘价,另一个用于交易流。通过安全性键入两个流,并将它们与CoProcessFunction连接。将previous_close_price存储在CoProcessFunction中的键控状态。

    每天,在市场开放之前,以最新的收盘价流动。

    这可以使用RichCoFlatMap完成,但我建议使用CoProcessFunction,因为您可能希望使用侧输出来报告错误(例如缺少previous_close_price的证券)。

    至于其他方法:

    我认为将previous_close_price数据保存在外部数据存储中没有任何好处。
    我觉得这不太好用。没有可用于触发新数据加载的钩子,而且只有在访问状态时才会清除状态。
    这对广播状态来说不是一个好的用例,除非集群中的每个人都需要知道所有证券的收盘价。

    0 0
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

推荐文章
相似问题
推荐课程