开发者社区> 问答> 正文

请问flinksql如何控制结果输出的频率呢?

我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:

public class EarlyEmitter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true); tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1000 ms");

Table table = tEnv.fromDataStream( env.addSource(new SourceData()), "generate_time, name, city, id, event_time.proctime"); tEnv.createTemporaryView("person", table);

String emit = "SELECT name, COUNT(DISTINCT id)" + "FROM person " + "GROUP BY TUMBLE(event_time, interval '10' second), name";

Table result = tEnv.sqlQuery(emit); tEnv.toRetractStream(result, Row.class).print();

env.execute("IncrementalGrouping"); }

private static final class SourceData implements SourceFunction<Tuple4<Long, String, String, Long>> { @Override public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) throws Exception { while (true) { long time = System.currentTimeMillis(); ctx.collect(Tuple4.of(time, "flink", "bj", 1L)); } }

@Override public void cancel() {

} } }*来自志愿者整理的FLINK邮件归档

展开
收起
玛丽莲梦嘉 2021-12-03 18:33:04 1400 0
1 条回答
写回答
取消 提交回答
  • 现在Emit的原理是这样子的: - 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器; - 当定时器到了的时候, - 检查当前的key下的聚合结果跟上次输出的结果是否有变化, - 如果有变化,就发送-[old], +[new] 两条结果到下游; - 如果是没有变化,则不做任何处理; - 再次注册一个新的emit delay之后的处理时间定时器。

    你可以根据这个原理,再对照下你的数据,看看是否符合预期。*来自志愿者整理的FLINK邮件归档

    2021-12-03 18:56:03
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
图计算优化技术探索 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载