大佬们好,Flink CDC flink中如何组合count窗口和time窗口呢?比如说数据达到100条或者达到10秒钟,哪个先到就触发,这种怎么实现呢?
整体代码如下,主要逻辑包含在 onElement 和 onProcessingTime,前者主要负责根据 count 触发,即实现 CountTrigger 的功能,后者则主要实现 ProcessingTime 的功能,需要预先定义两个 ReduceValue 分别记录 Count 和 Time,ReduceValue 详细用法可参考上文,下面分析主要方法。
class CountAndProcessingTimeTrigger(maxCount: Long, interval: Long) extends Trigger[String, TimeWindow] {
// 条数计数器
val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long])
// 时间计数器,保存下一次触发的时间
val timeStateDesc = new ReducingStateDescriptor[Long]("interval", new ReduceMin(), classOf[Long])
// 元素过来后执行的操作
override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
// 获取 count state 并累加数量
val count = triggerContext.getPartitionedState(countStateDesc)
val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
// 考虑count是否足够
count.add(1L)
if (count.get() >= maxCount) {
val log = s"CountTrigger Triggered Count: ${count.get()}"
println(formatString(log))
count.clear()
// 不等于默认窗口的触发时间
if (fireTimestamp.get() != window.maxTimestamp()) {
triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
}
fireTimestamp.clear()
return TriggerResult.FIRE
}
// 添加窗口的下次触发时间
val currentTimeStamp = triggerContext.getCurrentProcessingTime
if (fireTimestamp.get() == null) {
val nextFireTimeStamp = currentTimeStamp + interval
triggerContext.registerProcessingTimeTimer(nextFireTimeStamp)
fireTimestamp.add(nextFireTimeStamp)
}
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
// 获取 count state
val count = triggerContext.getPartitionedState(countStateDesc)
// 获取 Interval state
val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
// time default trigger
if (time == window.maxTimestamp()) {
val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}"
println(formatString(log))
count.clear()
triggerContext.deleteProcessingTimeTimer(fireTimestamp.get())
fireTimestamp.clear()
fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
return TriggerResult.FIRE
} else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}"
println(formatString(log))
count.clear()
fireTimestamp.clear()
fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval)
triggerContext.registerProcessingTimeTimer(fireTimestamp.get())
return TriggerResult.FIRE
}
TriggerResult.CONTINUE
}
override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
// 获取 count state
val count = triggerContext.getPartitionedState(countStateDesc)
// 获取 Interval state
val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc)
count.clear()
fireTimestamp.clear()
} }
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。