开发者社区> 问答> 正文

Flink SQL No Watermark如何解决?

大家好,请教一个问题

我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 watermark。消费大量的数据的时候,就无法生成watermark。 一直是 No Watermark。 暂时找不到排查问题的思路。 Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 EventTime mode 模式,Blink Planner。 | No Watermark | SQL如下

DDL: create table test( user_id varchar, action varchar, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) with();

DML: insert into console select user_id, f_get_str(bind_id) as id_list from ( select action as bind_id, user_id, event_time from ( SELECT user_id, action, PROCTIME() as proc_time, event_time FROM test ) T where user_id is not null and user_id <> '' and CHARACTER_LENGTH(user_id) = 24 ) T group by SESSION(event_time, INTERVAL '10' SECOND), user_id 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner 在translateToPlanInternal 中生成了如下一个 class 代码, public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public WatermarkGenerator$2(Object[] references) throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public Long currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if (!isNull$4) { result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return null; } else { return result$5.getMillisecond(); } } @Override public void close() throws Exception { } }

其中关键的信息是 result$5 = org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond() - ((long) 10000L), field$3.getNanoOfMillisecond()); 确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 watermark。 在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 这样的结果。因为这部分codegen的代码确实无法进一步debug了。 如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 *来自志愿者整理的flink邮件归档

展开
收起
游客sadna6pkvqnz6 2021-12-07 17:33:52 1758 0
1 条回答
写回答
取消 提交回答
  • hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗? 这里倒是有一个比较hack的方法: 将生成的类放在一个java文件之中,然后修改改下GeneratedClass下的newInstance方法,如果classname == “WatermarkGenerator$2” 则将刚才的类则返回 new WatermarkGenerator$2 这个类。

    我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。*来自志愿者整理的flink

    2021-12-07 21:13:02
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载