开发者社区 问答 正文

KAFKA: Connection to 3 was disconnected before the

环境

阿里云VPC环境,3台服务器均有公网IP 系统:Ubuntu16.04 资源:2C8G

问题 KAFKA集群,每天生产数据500万,单topic的数据超过2000万条时开始报这个错误, 现象是 生产数据正常,但是无法消费, 消费端不停的 connection failed --refreshing metadata 刚开始是一个节点报错,最后发展成所有节点都报错,但是数据量小的topic一切正常

下面贴出报错

[2020-01-27 19:31:05,541] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: {}. (org.apache.kafka.clients.FetchSessionHandler)
java.io.IOException: Connection to 3 was disconnected before the response was read
	at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
	at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)
	at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:196)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:286)
	at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
	at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
	at scala.Option.foreach(Option.scala:407)
	at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2020-01-27 19:31:05,542] WARN [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={RMZK-SAAS-FILTER-0=(fetchOffset=19299, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[12]), RMZK-SAAS-FILTER-1=(fetchOffset=19186, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[12]), RMZK-SAAS-FILTER-3=(fetchOffset=0, logStartOffset=0, maxBytes=10240000, currentLeaderEpoch=Optional[3])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
	at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
	at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:107)
	at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:196)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:286)
	at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:133)
	at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:132)
	at scala.Option.foreach(Option.scala:407)
	at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:132)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:114)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

下面是配置信息


broker.id=2
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://(公网IP):9092
num.network.threads=3
num.io.threads=4
socket.send.buffer.bytes=1024000
socket.receive.buffer.bytes=1024000
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-data
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=3000
replica.fetch.max.bytes=10240000
advertised.port=9092
advertised.host.name=(公网IP)
default.replication.factor=2
delete.topic.enable=true
message.max.bytes=10000000
port=9092
auto.create.topics.enable=true
delete.retention.ms=1000
flush.interval.messages=10000
flush.interval.ms=1000
group.max.session.timeout.ms=100000

3台服务的配置文件除了IP和broke id 其他都一致

展开
收起
人民中科-25737 2020-01-27 19:43:26 6243 分享 版权
阿里云 AI 助理回答

根据您提供的信息,Kafka集群在处理大量数据时遇到问题,特别是当单个topic的数据量超过2000万条时,开始出现消费端无法正常消费的情况。错误提示显示与连接断开和元数据刷新有关。以下是一些可能的解决方案和优化建议:

  1. 增加资源或调整配置

    • 检查服务器资源使用情况(CPU、内存、磁盘I/O),确保没有达到瓶颈。如果资源紧张,考虑升级实例规格或增加节点。
    • 调整socket.send.buffer.bytes, socket.receive.buffer.bytes, 和 socket.request.max.bytes等网络相关的配置以适应更高的吞吐量需求。
  2. 优化Kafka配置

    • replica.fetch.max.bytes: 当前设置为10MB,这限制了从领导者复制到追随者的数据量。考虑到您的消息大小和吞吐量,可能需要调高这个值。
    • message.max.bytes: 也应相应增大,以允许更大的消息通过系统。
    • num.network.threads, num.io.threads: 根据负载情况适当增加这些线程数,以提高处理能力。
    • log.retention.hours=168 (7天):确认是否符合数据保留策略,过短可能导致频繁的数据清理操作,影响性能。
  3. 检查网络稳定性

    • 确保所有节点间的网络连接稳定,公网IP之间的通信可能存在不稳定因素,考虑使用阿里云VPC内的私有IP进行通信,减少公网波动的影响。
    • 使用阿里云的专有网络VPC服务,并配置安全组规则,确保Kafka节点间通讯畅通无阻。
  4. Zookeeper配置与监控

    • 确认Zookeeper集群健康且配置得当,因为Kafka高度依赖于Zookeeper。Zookeeper的延迟或故障也可能导致这类问题。
    • 监控Zookeeper的连接状态和响应时间,及时发现并解决问题。
  5. 分区策略

    • 考虑增加topic的分区数量(num.partitions)来分散负载。对于高吞吐量的topic,更多的分区可以提升并行处理能力。
    • 确保生产者和消费者都正确地利用了分区,避免数据倾斜。
  6. 日志清理策略

    • 检查并优化日志清理策略(log.cleanup.policy),如使用“delete”策略并合理设置日志保留时间,避免因日志滚动过于频繁而影响性能。
  7. 监控与日志分析

    • 使用阿里云的Log Service或Prometheus+Grafana等工具对Kafka集群进行监控,收集详细的日志和指标,以便更深入地分析问题所在。

综上所述,解决这个问题可能需要综合调整配置、优化网络环境、增强监控以及可能的硬件资源升级。希望这些建议能帮助您解决问题。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答