我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
public class EarlyEmitter { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true); tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1000 ms");
Table table = tEnv.fromDataStream( env.addSource(new SourceData()), "generate_time, name, city, id, event_time.proctime"); tEnv.createTemporaryView("person", table);
String emit = "SELECT name, COUNT(DISTINCT id)" + "FROM person " + "GROUP BY TUMBLE(event_time, interval '10' second), name";
Table result = tEnv.sqlQuery(emit); tEnv.toRetractStream(result, Row.class).print();
env.execute("IncrementalGrouping"); }
private static final class SourceData implements SourceFunction<Tuple4<Long, String, String, Long>> { @Override public void run(SourceContext<Tuple4<Long, String, String, Long>> ctx) throws Exception { while (true) { long time = System.currentTimeMillis(); ctx.collect(Tuple4.of(time, "flink", "bj", 1L)); } }
@Override public void cancel() {
} } }*来自志愿者整理的FLINK邮件归档
现在Emit的原理是这样子的: - 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器; - 当定时器到了的时候, - 检查当前的key下的聚合结果跟上次输出的结果是否有变化, - 如果有变化,就发送-[old], +[new] 两条结果到下游; - 如果是没有变化,则不做任何处理; - 再次注册一个新的emit delay之后的处理时间定时器。
你可以根据这个原理,再对照下你的数据,看看是否符合预期。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。