Flink TTL(状态有效期)配置

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 当不知道何时清理状态的时候,状态有效期登场!

需求分析:
每日活跃用户在实时计算中非常常见。
通常会采用两点识别当日的用户:

- 是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用
- 由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重

问题描述:
如何定义这个一天?
正常逻辑为用一个状态去记录该用户上次登陆的时间和零点去进行对比,如果他们之间的差值小于24小时,那么就确定用户为当日登陆。否则,该用户就不是当日登陆。
可能教程中为了介绍TTL而强行引入,用作当日登陆的过期时间的判定。(我感觉这中间是有问题的)
TTL介绍:
看官网介绍:

任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值。所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。
TTL使用:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
  .newBuilder(Time.seconds(1))
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .build
  
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

TTL源码解析:

private final UpdateType updateType;
private final StateVisibility stateVisibility;
private final TtlTimeCharacteristic ttlTimeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;
  1. UpdateType:此选项值配置何时更新延长状态 TTL 的上次访问时间戳。

    • Disabled TTL 被禁用。 状态不会过期
    • OnCreateAndWrite 上次访问时间戳在每次写入操作创建和更新状态时初始化。
    • OnReadAndWrite 与 OnCreateAndWrite 相同,但也在读取时更新。
  2. StateVisibility:此选项配置是否可以返回过期的用户值。

    • ReturnExpiredIfNotCleanedUp 如果尚未清理,则返回过期的用户值。
    • NeverReturnExpired 永远不要返回过期的用户值。
  3. TtlTimeCharacteristic:此选项配置时间刻度以用于 ttl。暂时仅支持ProcessingTime。
  4. ttl:此选项配置过期时间。
  5. CleanupStrategies:此类配置何时使用 TTL 清除过期状态。 默认情况下,如果发现过期,则始终在显式读取访问时清除状态。

问题解决代码:

val UVStream: DataStream[JSONObject] = jsonStream.keyBy(_.getJSONObject("common").getString("mid"))
      .filter(new RichFilterFunction[JSONObject] {
        val fomat = new SimpleDateFormat("yyyy-MM-dd")
        val ttlConfig = StateTtlConfig
          .newBuilder(Time.seconds(1))
          .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
          .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
          .build()
        private val stateDes = new ValueStateDescriptor[String]("state", classOf[String])
        stateDes.enableTimeToLive(ttlConfig)
        lazy val state: ValueState[String] = getRuntimeContext
          .getState(stateDes)

        override def filter(t: JSONObject) = {
          val lastPageId: String = t.getJSONObject("page").getString("last_page_id")
          if (lastPageId == null || lastPageId.length <= 0) {
            val lastDate: String = state.value()
            val ts: lang.Long = t.getLong("ts")
            val curDate: String = fomat.format(ts)
            if (lastDate == null || !lastDate.equals(curDate)) {
              true
            } else {
              false
            }
          } else {
            false
          }
        }
      })

结束语:
感觉这个配置项还是很鸡肋,不精确。如果要计算当日活跃用户,那么我是不是要确保我状态的创建时间在零点?

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4天前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用合集之brokers可以配置多个,但是只要第1个brokers挂了任务就挂了如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
22 3
|
4天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
17 2
|
4天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版产品使用合集之用于实时同步整个数据库时,该如何配置DB2
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
28 5
|
6天前
|
SQL 资源调度 Java
Flink问题之动态配置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
52 1
|
3天前
|
SQL 数据处理 API
实时计算 Flink版产品使用合集之配置的Managed Memory不生效如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
10 0
|
3天前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之可以通过配置Oracle数据库的schema注册表来监测表结构的变化吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
12 1
|
4天前
|
关系型数据库 MySQL API
实时计算 Flink版产品使用合集之不设置ttl会导致ckp越来越大,设置了会丢失数据如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1
|
4天前
|
SQL 存储 Java
实时计算 Flink版产品使用合集之怎么配置日志的输出格式和文件大小
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
17 1
|
4天前
|
消息中间件 关系型数据库 Java
实时计算 Flink版产品使用合集之在生产环境中配置Flink CDC,主要需要考虑什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
25 1
|
4天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之有没有什么好的配置方案
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
9 1