我正在尝试从特定主题轮询数据,例如kafka正在接收100条记录/秒,但大多数情况下它不会获取所有记录。我将超时设置为5000ms,并且每次都调用此方法100ms 注意:我也订阅了特定主题
@Scheduled(fixedDelayString =“ 100”)
public void pollRecords() {
ConsumerRecords<String, String> records =
leadConsumer.poll("5000");
我如何从kafka获取所有数据?
问题来源:Stack Overflow
从poll()返回的最大记录数由max.poll.records消费者配置参数指定。(默认值为500)。此外,还有另一个使用者配置参数,用于限制从服务器返回的最大数据量。(fetch.max.bytes和max.partition.fetch.bytes)
另一方面,在经纪人方面,还有另一个大小限制,称为message.max.bytes。
因此,您应该正确设置这些参数以获取更多消息。
从Kafka文档(链接):
max.poll.records:一次调用poll()时返回的最大记录数。(预设值:500)
fetch.max.bytes:服务器应为获取请求返回的最大数据量。使用者将批量获取记录,并且如果获取的第一个非空分区中的第一个记录批次大于此值,则仍将返回记录批次以确保使用者可以取得进展。因此,这不是绝对最大值。代理接受的最大记录批处理大小是通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义的。请注意,使用者并行执行多个提取。(预设值:52428800)
message.max.bytes: Kafka允许的最大记录批处理大小。如果增加了该数量,并且有一些使用方的年龄大于0.10.2,则还必须增加使用方的获取大小,以便他们可以获取如此大的记录批次。在最新的消息格式版本中,为了提高效率,始终将记录分组。在以前的消息格式版本中,未压缩的记录不会分组,并且在这种情况下,此限制仅适用于单个记录。可以使用主题级别max.message.bytes配置对每个主题进行设置。(预设值:1000012)
max.partition.fetch.bytes:服务器将返回的每个分区的最大数据量。记录由消费者分批提取。如果提取的第一个非空分区中的第一个记录批处理大于此限制,则仍将返回该批处理以确保使用者可以取得进展。代理接受的最大记录批处理大小是通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义的。有关限制使用者请求大小的信息,请参见fetch.max.bytes。(预设值:1048576)
回答来源:Stack Overflow
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。