需求分析:
每日活跃用户在实时计算中非常常见。
通常会采用两点识别当日的用户:
- 是识别出该访客打开的第一个页面,表示这个访客开始进入我们的应用
- 由于访客可以在一天中多次进入应用,所以我们要在一天的范围内进行去重
问题描述:
如何定义这个一天?
正常逻辑为用一个状态去记录该用户上次登陆的时间和零点去进行对比,如果他们之间的差值小于24小时,那么就确定用户为当日登陆。否则,该用户就不是当日登陆。
可能教程中为了介绍TTL而强行引入,用作当日登陆的过期时间的判定。(我感觉这中间是有问题的)
TTL介绍:
看官网介绍:
任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值。所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。
TTL使用:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
TTL源码解析:
private final UpdateType updateType;
private final StateVisibility stateVisibility;
private final TtlTimeCharacteristic ttlTimeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;
UpdateType:此选项值配置何时更新延长状态 TTL 的上次访问时间戳。
- Disabled TTL 被禁用。 状态不会过期
- OnCreateAndWrite 上次访问时间戳在每次写入操作创建和更新状态时初始化。
- OnReadAndWrite 与 OnCreateAndWrite 相同,但也在读取时更新。
StateVisibility:此选项配置是否可以返回过期的用户值。
- ReturnExpiredIfNotCleanedUp 如果尚未清理,则返回过期的用户值。
- NeverReturnExpired 永远不要返回过期的用户值。
- TtlTimeCharacteristic:此选项配置时间刻度以用于 ttl。暂时仅支持ProcessingTime。
- ttl:此选项配置过期时间。
- CleanupStrategies:此类配置何时使用 TTL 清除过期状态。 默认情况下,如果发现过期,则始终在显式读取访问时清除状态。
问题解决代码:
val UVStream: DataStream[JSONObject] = jsonStream.keyBy(_.getJSONObject("common").getString("mid"))
.filter(new RichFilterFunction[JSONObject] {
val fomat = new SimpleDateFormat("yyyy-MM-dd")
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build()
private val stateDes = new ValueStateDescriptor[String]("state", classOf[String])
stateDes.enableTimeToLive(ttlConfig)
lazy val state: ValueState[String] = getRuntimeContext
.getState(stateDes)
override def filter(t: JSONObject) = {
val lastPageId: String = t.getJSONObject("page").getString("last_page_id")
if (lastPageId == null || lastPageId.length <= 0) {
val lastDate: String = state.value()
val ts: lang.Long = t.getLong("ts")
val curDate: String = fomat.format(ts)
if (lastDate == null || !lastDate.equals(curDate)) {
true
} else {
false
}
} else {
false
}
}
})
结束语:
感觉这个配置项还是很鸡肋,不精确。如果要计算当日活跃用户,那么我是不是要确保我状态的创建时间在零点?