环境
阿里云VPC环境,3台服务器均有公网IP 系统:Ubuntu16.04 资源:2C8G
问题 KAFKA集群,每天生产数据500万,单topic的数据超过2000万条时开始报这个错误, 现象是 生产数据正常,但是无法消费, 消费端不停的 connection failed --refreshing metadata 刚开始是一个节点报错,最后发展成所有节点都报错,但是数据量小的topic一切正常
下面贴出报错
[2020-01-27 19:31:05,541] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to 3 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:196)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:286)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
at scala.Option.foreach(Option.scala:407)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2020-01-27 19:31:05,542] WARN [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={RMZK-SAAS-FILTER-0=(fetchOffset=19299, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[12]), RMZK-SAAS-FILTER-1=(fetchOffset=19186, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[12]), RMZK-SAAS-FILTER-3=(fetchOffset=0, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[3])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)
at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:196)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:286)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
at scala.Option.foreach(Option.scala:407)
at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
下面是配置信息
broker.id=2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://(公网IP):9092
num.network.threads=3
num.io.threads=4
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-data
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000
replica.fetch.max.bytes=10240000
advertised.port=9092
advertised.host.name=(公网IP)
default.replication.factor=2
delete.topic.enable=true
message.max.bytes=10000000
port=9092
auto.create.topics.enable=true
delete.retention.ms=1000
flush.interval.messages=10000
flush.interval.ms=1000
group.max.session.timeout.ms=100000
3台服务的配置文件除了IP和broke id 其他都一致
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。