我在用 Debezium 从数据库读取最新的 binlog 写入 Kafka,比如对于 mysql_server.test.tableA 有一个 topic “mysql_server.test.tableA”,我需要在 Flink 里实现这样的逻辑:
问题是我怎么能暂停消费 binlog-stream 呢? 我目前想到的办法是用 flink state 做一个全局状态 startBinlog,初始值为 false:
binlog-stream -> waitOperator -> sinkOperator table-stream -> notifyOperator -> sinkOperator
两个流被合并输出到 sinkOperator,waitOperator() 会 while loop阻塞式的检查全局状态, 等 table-stream 消费完(不知道怎么判断消费完了。。。), notifyOperator 修改全局状态,这样 binlog-stream 就能被继续消费了。
但由于 kafka consumer 如果长期阻塞不 ack 的话,kafka consumer 会被断开,所以这个做法应该是不行的。
请教怎么破?
谢谢!*来自志愿者整理的FLINK邮件归档
你这里的合并是用join 来做么? 这样的话,会比较耗性能。
一种做法是先消费 jdbc table, 消费完后,再切换到 kafka 上。这种要求 binlog 是幂等操作的,因为会有多处理一部分的 binlog,没法做到 精确地切换到 kafka offset 上。
另外你也可以参考下 StreamSQL 的 bootstrap 的做法: https://streamsql.io/blog/from-apache-kafka-to-apache-pulsar*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。