Flink TTL(状态有效期)配置

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 当不知道何时清理状态的时候,状态有效期登场!

需求分析:
每日活跃用户在实时计算中非常常见。
通常会采用两点识别当日的用户:

- 是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用
- 由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重

问题描述:
如何定义这个一天?
正常逻辑为用一个状态去记录该用户上次登陆的时间和零点去进行对比,如果他们之间的差值小于24小时,那么就确定用户为当日登陆。否则,该用户就不是当日登陆。
可能教程中为了介绍TTL而强行引入,用作当日登陆的过期时间的判定。(我感觉这中间是有问题的)
TTL介绍:
看官网介绍:

任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值。所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。
TTL使用:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
  .newBuilder(Time.seconds(1))
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .build
  
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

TTL源码解析:

private final UpdateType updateType;
private final StateVisibility stateVisibility;
private final TtlTimeCharacteristic ttlTimeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;
  1. UpdateType:此选项值配置何时更新延长状态 TTL 的上次访问时间戳。

    • Disabled TTL 被禁用。 状态不会过期
    • OnCreateAndWrite 上次访问时间戳在每次写入操作创建和更新状态时初始化。
    • OnReadAndWrite 与 OnCreateAndWrite 相同,但也在读取时更新。
  2. StateVisibility:此选项配置是否可以返回过期的用户值。

    • ReturnExpiredIfNotCleanedUp 如果尚未清理,则返回过期的用户值。
    • NeverReturnExpired 永远不要返回过期的用户值。
  3. TtlTimeCharacteristic:此选项配置时间刻度以用于 ttl。暂时仅支持ProcessingTime。
  4. ttl:此选项配置过期时间。
  5. CleanupStrategies:此类配置何时使用 TTL 清除过期状态。 默认情况下,如果发现过期,则始终在显式读取访问时清除状态。

问题解决代码:

val UVStream: DataStream[JSONObject] = jsonStream.keyBy(_.getJSONObject("common").getString("mid"))
      .filter(new RichFilterFunction[JSONObject] {
        val fomat = new SimpleDateFormat("yyyy-MM-dd")
        val ttlConfig = StateTtlConfig
          .newBuilder(Time.seconds(1))
          .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
          .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
          .build()
        private val stateDes = new ValueStateDescriptor[String]("state", classOf[String])
        stateDes.enableTimeToLive(ttlConfig)
        lazy val state: ValueState[String] = getRuntimeContext
          .getState(stateDes)

        override def filter(t: JSONObject) = {
          val lastPageId: String = t.getJSONObject("page").getString("last_page_id")
          if (lastPageId == null || lastPageId.length <= 0) {
            val lastDate: String = state.value()
            val ts: lang.Long = t.getLong("ts")
            val curDate: String = fomat.format(ts)
            if (lastDate == null || !lastDate.equals(curDate)) {
              true
            } else {
              false
            }
          } else {
            false
          }
        }
      })

结束语:
感觉这个配置项还是很鸡肋,不精确。如果要计算当日活跃用户,那么我是不是要确保我状态的创建时间在零点?

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
98 0
|
3月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
99 0
|
2月前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
114 9
|
3月前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
159 4
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
211 0
|
5月前
|
资源调度 调度 流计算
Flink 细粒度资源管理问题之为不同的SSG配置资源如何解决
Flink 细粒度资源管理问题之为不同的SSG配置资源如何解决
|
5月前
|
存储 NoSQL 分布式数据库
Flink 细粒度资源管理问题之调整 slot 配置来提高资源利用效率如何解决
Flink 细粒度资源管理问题之调整 slot 配置来提高资源利用效率如何解决
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
417 2
|
6月前
|
消息中间件 NoSQL Redis
实时计算 Flink版产品使用问题之配置了最大连续失败数不为1,在Kafka的精准一次sink中,如果ck失败了,这批数据是否会丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。