Flink CDC里就是消费kafka中的debezium-json格式数据,使用滚动窗口统计1分钟新增的数据量,这个简单的需求,被卡住了。之前一直写的datastream,现在想用flink sql来实现我本来就是消费的kafka数据,为什么还要推到kafka里面?我现在数据源是kafka,kafka中的数据是flink cdc同步到kafka中的,数据有更新,有插入。
// 源表
tableEnv.executeSql("CREATE TABLE KafkaTable (\n" +
" UNSEND_ID
STRING,\n" +
" SEND_TIME
BIGINT,\n" +
" SENDER_CODE
STRING,\n" +
" STATUS
STRING,\n" +
" CREATE_TIME
BIGINT,\n" +
" ts AS TO_TIMESTAMP_LTZ(CREATE_TIME, 3),\n" +
" WATERMARK FOR ts AS ts - INTERVAL '2' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'T_E10_MESSAGE_UNSEND_INFO',\n" +
" 'properties.bootstrap.servers' = 'hadoop01:9092',\n" +
" 'properties.group.id' = 'test',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'debezium-json'\n" +
")\n");// 开窗聚合
tableEnv.executeSql("select\n" +
" TUMBLE_START(ts, interval '1' MINUTE),\n" +
" TUMBLE_END(ts, interval '1' MINUTE),\n" +
" count(1) as cnt\n" +
"from KafkaTable\n" +
"group by TUMBLE(ts, interval '1' MINUTE)").print();
就是上面的数据,我一执行滚动开窗,就报下面的错误怎么解决?Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, KafkaTable, watermark=[-(TO_TIMESTAMP_LTZ($4, 3), 2000:INTERVAL SECOND)]]], fields=[UNSEND_ID, SEND_TIME, SENDER_CODE, STATUS, CREATE_TIME])有点理解了,但是为什么我指定'format' = 'debezium-json'就不行呢?
根据你提供的信息,问题出在使用了debezium-json
格式的数据源。滚动窗口聚合操作不支持消费由TableSourceScan
节点产生的更新和删除更改。这是因为debezium-json
格式的数据源会将数据变更(如插入、更新和删除)作为单独的记录发送到Kafka,而不是将它们作为数据流的一部分。
要解决这个问题,你可以尝试以下方法:
使用Flink SQL的ROW_NUMBER()
函数为每个数据变更分配一个唯一的行号。然后,你可以根据这个行号进行分组和聚合操作。这样,你可以避免使用滚动窗口聚合操作,而是使用分组和聚合操作来处理数据变更。
如果你仍然希望使用滚动窗口聚合操作,你可以考虑将数据变更转换为数据流。这可以通过使用Flink的CEP
(Complex Event Processing)库来实现。CEP
库允许你处理事件流中的模式匹配和状态管理。你可以使用CEP
库来检测数据变更,并将它们转换为数据流,然后再应用滚动窗口聚合操作。
关于为什么指定'format' = 'debezium-json'就不行的问题,这是因为debezium-json
格式的数据源与Flink SQL的滚动窗口聚合操作不兼容。如上所述,debezium-json
格式的数据源会将数据变更作为单独的记录发送到Kafka,而不是将它们作为数据流的一部分。因此,你需要使用其他方法来处理这些数据变更,例如使用Flink SQL的ROW_NUMBER()
函数或使用Flink的CEP
库。
cdc数据不支持,实现需求需要转换下处理下,但是不能接入update和delete语句。一个简单的办法,直接接kafka是json格式,然后消费json来处理,因为你只需要消费debeziuum-json里面after里面的数据。参考:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/formats/debezium/
此回答来自钉群Flink CDC 社区。
报错提示是因为StreamPhysicalGroupWindowAggregate操作目前不支持直接处理由Debezium生成的包含更新和删除事件的变更日志。Debezium JSON格式会携带数据库更改事件的类型(insert、update、delete),而滚动窗口聚合通常用于计算基于插入事件的统计数据。
解决方案可以是:
upsert-kafka
,它能处理这些变更流并将它们以upsert的形式输出。版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。