各位大神求帮忙看一下。
Flink 版本:1.10.0 Planner:blink
我在使用Flink SQL的时候遇到了一个问题,能否帮忙看一下,我尝试在寻找了解决方法,但是没有起作用。 比如我发现类似的问题 https://www.mail-archive.com/user-zh@flink.apache.org/msg03916.html 中描述的问题,根据这个mail中的解决方法我设置了timezone,但是问题没有被解决。
Flink Table Env配置 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); * env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);* * env.setParallelism(1);* *EnvironmentSettings envSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); * StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); *tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); *
我这个job应用中定义了两个table,分别为source table “sqlDdlAnaTable”
String sqlDdlAnaTable = "CREATE TABLE ana_Source(type INT, datatime BIGINT, list ARRAY <ROW(id STRING, v FLOAT, q INTEGER)>, ts AS TO_TIMESTAMP(FROM_UNIXTIME(datatime)), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND)" + * " WITH (" +* * "'connector.type' = 'pravega'," +* * "'connector.version' = '1'," +* * "'connector.connection-config.controller-uri'= 'tcp://192.168.188.130:9090 http://192.168.188.130:9090'," +* * "'connector.connection-config.default-scope' = 'Demo'," +* * "'connector.reader.stream-info.0.stream' = 'test'," +* * "'format.type' = 'json'," +* * "'format.fail-on-missing-field' = 'false', " +* * "'update-mode' = 'append')";*
和sink table " sqlDdlSinkTable ".
在数据处理逻辑比较简单,计算10s tumble window的vaule的平均。 我一开始直接打印结果能够明确看到10s中输出一次计算结果,watermark也正常移动。 String sqlAna = "SELECT ts, id, v " + * "FROM tb_JsonRecord " +* * "WHERE q=1 AND type=1";* * Table tableAnaRecord = tableEnv.sqlQuery(sqlAna);* * tableEnv.registerTable("tb_AnaRecord", tableAnaRecord);*
tableEnv.toAppendStream(tableAnaRecord, Row.class).print()
但是我尝试将结果insert到sink table中发现,就没有任何结果被写入。 String sqlAnaAvg = "INSERT INTO tb_sink(id, wStart, v) " + * "SELECT id, " +* * "TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, " +* * "AVG(v) FROM tb_AnaRecord " +* * "GROUP BY TUMBLE(ts, INTERVAL '10' SECOND), id"; * * tableEnv.sqlUpdate(sqlAnaAvg);*
提前感谢!
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据你的描述,你遇到的问题是在使用Apache Flink SQL进行窗口聚合计算后,尝试将结果插入到sink表中时没有数据被写入。尽管直接打印结果时能看到预期的输出。这里有几个可能的原因和相应的检查建议:
Sink配置问题:首先确认sink表tb_sink
的配置是否正确无误,特别是与Pravega相关的连接设置、格式类型等。确保sink端能够正确接收并处理数据。
Watermark对齐问题:虽然你提到了watermark正常移动,但请再次确认在执行窗口聚合时,watermark是否已经超前于窗口结束时间,从而触发计算完成并写出数据。有时候,水印推进不够或者不准确会导致窗口计算无法完成。
并发度问题:尽管你设置了环境的并行度为1,但在某些情况下,如果sink操作或上下游算子有隐式的并行度设置,可能会导致数据分流而未实际写入期望的sink。检查sink操作是否有特定的并行度要求,并确认所有相关算子的并行度设置是一致的。
错误处理与日志:查看Flink作业的日志,尤其是sink operator的日志,看是否有错误信息或警告提示。这可能是数据写入失败的直接原因,比如权限问题、网络问题或序列化错误等。
资源限制:检查Flink集群的资源情况,包括内存、CPU等是否足够。资源不足可能导致任务调度失败或运行缓慢,间接影响数据写入。
事务/Exactly Once语义:由于你设置了sink的写入模式为atleast_once
,这通常意味着牺牲了一定程度的一致性来换取可用性。不过,请确认Pravega sink是否正确处理了事务边界,以及事务提交逻辑是否符合预期。有时,事务管理不当也会导致数据未能成功写入。
版本兼容性:虽然不太常见,但不同版本的Flink、Pravega connector或其他依赖库之间可能存在兼容性问题。确认你使用的Flink版本与Pravega connector版本之间是兼容的。
针对以上几点,你可以逐一排查,从最简单的配置检查开始,逐步深入到更复杂的逻辑和性能问题。同时,利用Flink提供的metrics监控和日志系统,可以更有效地定位问题所在。