环境
阿里云VPC环境,3台服务器均有公网IP 系统:Ubuntu16.04 资源:2C8G
问题 KAFKA集群,每天生产数据500万,单topic的数据超过2000万条时开始报这个错误, 现象是 生产数据正常,但是无法消费, 消费端不停的 connection failed --refreshing metadata 刚开始是一个节点报错,最后发展成所有节点都报错,但是数据量小的topic一切正常
下面贴出报错
[2020-01-27 19:31:05,541] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to 3 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:196)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:286)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
at scala.Option.foreach(Option.scala:407)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2020-01-27 19:31:05,542] WARN [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={RMZK-SAAS-FILTER-0=(fetchOffset=19299, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[12]), RMZK-SAAS-FILTER-1=(fetchOffset=19186, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[12]), RMZK-SAAS-FILTER-3=(fetchOffset=0, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[3])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:196)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:286)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
at scala.Option.foreach(Option.scala:407)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
下面是配置信息
broker.id=2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://(公网IP):9092
num.network.threads=3
num.io.threads=4
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-data
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000
replica.fetch.max.bytes=10240000
advertised.port=9092
advertised.host.name=(公网IP)
default.replication.factor=2
delete.topic.enable=true
message.max.bytes=10000000
port=9092
auto.create.topics.enable=true
delete.retention.ms=1000
flush.interval.messages=10000
flush.interval.ms=1000
group.max.session.timeout.ms=100000
3台服务的配置文件除了IP和broke id 其他都一致
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
根据您提供的信息,Kafka集群在处理大量数据时遇到问题,特别是当单个topic的数据量超过2000万条时,开始出现消费端无法正常消费的情况。错误提示显示与连接断开和元数据刷新有关。以下是一些可能的解决方案和优化建议:
增加资源或调整配置:
socket.send.buffer.bytes
, socket.receive.buffer.bytes
, 和 socket.request.max.bytes
等网络相关的配置以适应更高的吞吐量需求。优化Kafka配置:
检查网络稳定性:
Zookeeper配置与监控:
分区策略:
num.partitions
)来分散负载。对于高吞吐量的topic,更多的分区可以提升并行处理能力。日志清理策略:
监控与日志分析:
综上所述,解决这个问题可能需要综合调整配置、优化网络环境、增强监控以及可能的硬件资源升级。希望这些建议能帮助您解决问题。