问题描述
根据Azure Event Hub示例文档,[将 Apache Flink 与适用于 Apache Kafka 的 Azure 事件中心配合使用],配置好 consumer.config 文件后,为什么不能自动消费Event Hub中存储的旧数据呢?
如果发送端(生产者)没有任何消息输出,这消费端的信息也是一片空白,如:
问题分析
在 Kafka 中,每当消费者组内的消费者查找不到所记录的消费位移或发生位移越界时,就会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,这个参数的默认值为 “latest” 。
auto.offset.reset 的值可以为 earliest、latest 和 none 。关于 earliest 和 latest 的解释,真实情况如下所示:
- earliest :当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费。
- latest :当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费该分区下新产生的数据。
- none :topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
使用Flink消费端以Kafka方式消费Event Hub中的事件,当消费端第一次运行时,因为默认配置,所以消费端只消费新产生的数据。
如果需要消费旧的数据,需要在 consumer.config 中添加 auto.offset.reset = earliest 。
修改后的测试效果为:
参考资料
Flink消费kafka的offset设置:https://blog.csdn.net/pageniao/article/details/124279617
Seeking to a Specific Offset:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek