大家好,请教一个问题
我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 一直是 No Watermark。 暂时找不到排查问题的思路。 Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 | No Watermark | SQL如下
DDL: create table test( user_id varchar, action varchar, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) with();
DML: insert into console select user_id, f_get_str(bind_id) as id_list from ( select action as bind_id, user_id, event_time from ( SELECT user_id, action, PROCTIME() as proc_time, event_time FROM test ) T where user_id is not null and user_id <> '' and CHARACTER_LENGTH(user_id) = 24 ) T group by SESSION(event_time, INTERVAL '10' SECOND), user_id 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner 在translateToPlanInternal 中生成了如下一个 class 代码, public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } }
其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); 确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。 在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 *来自志愿者整理的flink邮件归档
hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗? 这里倒是有一个比较hack的方法: 将生成的类放在一个java文件之中,然后修改改下GeneratedClass下的newInstance方法,如果classname == “WatermarkGenerator$2” 则将刚才的类则返回 new WatermarkGenerator$2 这个类。
我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。