各位大佬,小白求教:
我想实现一个功能,让flink能够动态更新数据源。
我现在使用的是继承RichSourceFunction方法,然后在open方法里面起一个线程定时检查kafka里面的topic变化,如果topic新增,则在run方法里面重新初始化kafkaConsumer然后poll数据,再将value值collect到ctx中。
这样做我发现并不能将新增后的topic消费到,还是只能消费代码启动的时候的topic,这是为什么?
请问有什么好的办法能实现这样的功能吗?
在 Apache Flink 中,动态修改 Kafka 数据源是一个比较复杂的需求,因为 Flink 的 Kafka Consumer 通常是在初始化时就配置好的,并且会静态地消费指定的 Topic。一旦开始运行,它不会自动感知 Topic 的变化。
使用 Flink Kafka Connector 的动态 Topic 功能:
Flink 的 Kafka Connector 支持动态 Topic,这意味着你可以在运行时添加或删除 Topics。你需要在创建 Kafka Consumer 时配置 subscribe 方法来订阅一个空字符串或者使用 subscribePattern 方法来使用某种模式匹配 Topics。
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"pattern", // 使用订阅模式
new SimpleStringSchema(),
properties);
kafkaConsumer.setStartFromGroupOffsets(); // 从组偏移量开始消费
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。