最近需要使用Flink计算TopN碰到一些问题 不知道大家有没有遇到过 计算TopN的所使用的SQL语句是如下形式
create stream input table raw_log ( country STRING, domain STRING, flux LONG, request LONG, rowtime AS ROWTIME(request, "2 SECOND") ) USING kafka ( kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}", startingOffsets = earliest, subscribe = "input" ) ROW FORMAT JSON; create stream output table top_n_result USING kafka ( kafka.bootstrap.servers = "localhost:${actualConfig.kafkaPort}", topic = "output" ) ROW FORMAT JSON TBLPROPERTIES("update-mode" = upsert); create view window_log as select TUMBLE_START(rowtime, INTERVAL '2' SECOND) as wStart, country, domain, sum(flux) as flux from raw_log group by TUMBLE(rowtime, INTERVAL '2' SECOND), country, domain; insert into top_n_result SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY wStart ORDER BY flux desc ) AS row_num FROM window_log ) WHERE row_num <= 10;
就是前面是一个基于事件时间的窗口计算逻辑后面跟着一个TopN的计算逻辑 跑在Flink 1.9的blink上的 在TopN计算上先按窗口开始时间做分区然后排序输出Top结果 这里就产生了
一个状态管理的问题 因为窗口计算是不断向前的 也就是将窗口开始时间作为分区键会导致状态不断增大 后续在测试过程中发现其底层是实现为RetractableTopNFunction 然后在这个实现中没有发现状态清理的逻辑 而在
AppendOnlyTopNFunction和UpdatableTopNFunction中存在状态清理的逻辑 为什么要这么实现? 能否在RetractableTopNFunction中实现状态清理? 并且保证状态安全被删除?*来自志愿者整理的flink邮件归档
这应该是一个 mistake。 我创建了一个 issue 去跟踪这个问题。 https://issues.apache.org/jira/browse/FLINK-14119 https://issues.apache.org/jira/browse/FLINK-14119*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。