Kafka发送超过broker限定大小的消息时Client和Broker端各自会有什么异常?

简介: 前几天遇到一个bug,查看发送日志发现java.io.IOException: Broken pipe的错误,通过深入了解发现当kafka producer发送的消息体大于Broker配置的默认值时就会报这个异常。

前几天遇到一个bug,查看发送日志发现java.io.IOException: Broken pipe的错误,通过深入了解发现当kafka producer发送的消息体大于Broker配置的默认值时就会报这个异常。如果仅发送一次是不会报这个异常的,要连续发送才会报这个异常。

本博文记录一下当Kafka发送超过broker限定大小的消息时Client和Broker端各自会有什么异常。

Kafka Broker Configs中有一个参数:message.max.bytes——用来指定消息的大小。
这里写图片描述

当Producer向Broker发送一个比Kafka Broker配置的阈值还要大的一个消息时,Producer端和Broker端会有什么异常情况。
Producer端测试代码:

public class Producer {

    public static final String brokerList = "10.198.197.59:9092";
    public static final String topic = "versionTopic";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("metadata.broker.list", brokerList);

        ProducerConfig config = new ProducerConfig(properties);
        kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer<Integer, String>(config);

        String message = getMessage(1 * 1024 * 1024);

        for(int i=0;i<3;i++) {
            KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>(topic, message);
            producer.send(keyedMessage);
            System.out.println("=============================");
        }

        try {
            TimeUnit.SECONDS.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public static String getMessage(int msgSize) {
        StringBuilder stringBuilder = new StringBuilder();
        for(int i=0;i<msgSize;i++) {
            stringBuilder.append("x");
        }
        return stringBuilder.toString();
    }
}

Producer端输出:

2017-02-28 16:19:31 -[INFO] - [Verifying properties] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:31 -[INFO] - [Property metadata.broker.list is overridden to 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:31 -[INFO] - [Property serializer.class is overridden to kafka.serializer.StringEncoder] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:31 -[INFO] - [Fetching metadata from broker id:0,host:10.198.197.59,port:9092 with correlation id 0 for 1 topic(s) Set(versionTopic)] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:31 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:31 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:31 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
=============================
2017-02-28 16:19:34 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:34 -[WARN] - [Failed to send producer request with correlation id 4 to broker 0 with data for partitions [versionTopic,0]] - [kafka.utils.Logging$class:89]
java.io.IOException: 你的主机中的软件中止了一个已建立的连接。(ps:如果没有中文,这里会出现“java.io.IOException: Broken pipe”的报错。)
    at sun.nio.ch.SocketDispatcher.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
    at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.kafka.Producer.main(Producer.java:30)
2017-02-28 16:19:34 -[INFO] - [Back off for 100 ms before retrying send. Remaining retries = 3] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:34 -[INFO] - [Fetching metadata from broker id:0,host:10.198.197.59,port:9092 with correlation id 5 for 1 topic(s) Set(versionTopic)] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:34 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:34 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:34 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:34 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
=============================
2017-02-28 16:19:38 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:38 -[WARN] - [Failed to send producer request with correlation id 9 to broker 0 with data for partitions [versionTopic,0]] - [kafka.utils.Logging$class:89]
java.io.IOException: 你的主机中的软件中止了一个已建立的连接。(ps:如果没有中文,这里会出现“java.io.IOException: Broken pipe”的报错。)
    at sun.nio.ch.SocketDispatcher.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:55)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
    at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
    at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
    at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
    at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
	at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
	at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
	at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
    at kafka.producer.Producer.send(Producer.scala:77)
    at kafka.javaapi.producer.Producer.send(Producer.scala:33)
    at com.kafka.Producer.main(Producer.java:30)
2017-02-28 16:19:38 -[INFO] - [Back off for 100 ms before retrying send. Remaining retries = 3] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:38 -[INFO] - [Fetching metadata from broker id:0,host:10.198.197.59,port:9092 with correlation id 10 for 1 topic(s) Set(versionTopic)] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:38 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:38 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:38 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-02-28 16:19:38 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
=============================

注意输出中的:java.io.IOException: 你的主机中的软件中止了一个已建立的连接。(ps:如果没有中文,这里会出现“java.io.IOException: Broken pipe”的报错。)

而Broker端会有报错:

[2017-02-28 16:04:03,384] INFO Closing socket connection to /10.101.48.240. (kafka.network.Processor)
[2017-02-28 16:04:06,466] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 2 from client  on partition [versionTopic,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 1048602 bytes which exceeds the maximum configured message size of 1000012.
    at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
	at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
    at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
    at kafka.log.Log.append(Log.scala:257)
    at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
	at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.utils.Utils$.inReadLock(Utils.scala:541)
    at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
    at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
	at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
    at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
    at java.lang.Thread.run(Thread.java:745)
[2017-02-28 16:04:06,467] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 2, topicAndPartition = [versionTopic,0]] with Ack=0 (kafka.server.KafkaApis)
[2017-02-28 16:04:06,629] INFO Closing socket connection to /10.101.48.240. (kafka.network.Processor)
[2017-02-28 16:04:09,921] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 7 from client  on partition [versionTopic,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 1048602 bytes which exceeds the maximum configured message size of 1000012.
    at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
	at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
    at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
    at kafka.log.Log.append(Log.scala:257)
    at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
	at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.utils.Utils$.inReadLock(Utils.scala:541)
    at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
    at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
	at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
    at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
    at java.lang.Thread.run(Thread.java:745)
[2017-02-28 16:04:09,922] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 7, topicAndPartition = [versionTopic,0]] with Ack=0 (kafka.server.KafkaApis)
[2017-02-28 16:04:10,096] INFO Closing socket connection to /10.101.48.240. (kafka.network.Processor)
[2017-02-28 16:04:13,374] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 12 from client  on partition [versionTopic,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 1048602 bytes which exceeds the maximum configured message size of 1000012.
    at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
	at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
    at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
    at kafka.log.Log.append(Log.scala:257)
    at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
	at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
    at kafka.utils.Utils$.inLock(Utils.scala:535)
    at kafka.utils.Utils$.inReadLock(Utils.scala:541)
    at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
    at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
	at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
    at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
    at java.lang.Thread.run(Thread.java:745)
[2017-02-28 16:04:13,375] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 12, topicAndPartition = [versionTopic,0]] with Ack=0 (kafka.server.KafkaApis)

注意输出中的:kafka.common.MessageSizeTooLargeException: Message size is 1048602 bytes which exceeds the maximum configured message size of 1000012.这句。


注意:当kafka一切正常,producer端发送也会出现这样的INFO:

2017-03-07 20:06:03 -[INFO] - [Verifying properties] - [kafka.utils.Logging$class:68]
2017-03-07 20:06:04 -[INFO] - [Property metadata.broker.list is overridden to 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-03-07 20:06:04 -[INFO] - [Property serializer.class is overridden to kafka.serializer.StringEncoder] - [kafka.utils.Logging$class:68]
2017-03-07 20:06:04 -[INFO] - [Fetching metadata from broker id:0,host:10.198.197.59,port:9092 with correlation id 0 for 1 topic(s) Set(testTopic)] - [kafka.utils.Logging$class:68]
2017-03-07 20:06:04 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
2017-03-07 20:06:04 -[INFO] - [Disconnecting from 10.198.197.59:9092] - [kafka.utils.Logging$class:68]
2017-03-07 20:06:04 -[INFO] - [Connected to 10.198.197.59:9092 for producing] - [kafka.utils.Logging$class:68]
(之后producer发送数据)

看倒数三行,咋一看以为是出了异常,但事实上这是正常的INFO, 至于为什么先Connected又Disconnecting又Connected那就不得而知了,等博主翻阅了kafka的源码之后再来解释这个现象咯~

目录
相关文章
|
4月前
|
消息中间件 Kafka
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
使用kafka consumer加载数据加载异常并且报source table and destination table are not same错误解决办法
|
10月前
|
消息中间件 Kafka
Kafka【问题 03】Connection to node -1 (/IP:9092) could not be established. Broker may not be available.
Kafka【问题 03】Connection to node -1 (/IP:9092) could not be established. Broker may not be available.
1305 0
|
4月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
149 2
|
9月前
|
消息中间件 存储 缓存
Kafka(三)【Broker 存储】(1)
Kafka(三)【Broker 存储】
|
5月前
|
消息中间件 JSON 大数据
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
大数据-65 Kafka 高级特性 分区 Broker自动再平衡 ISR 副本 宕机恢复再重平衡 实测
143 4
|
5月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
135 4
|
5月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
88 1
|
7月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
183 2
|
10月前
|
消息中间件 Cloud Native Kafka
一文搞懂 Kafka consumer 与 broker 交互机制与原理
AutoMQ致力于打造下一代云原生Kafka系统,解决Kafka痛点。本文深入解析Kafka Consumer与Broker的交互机制,涉及消费者角色、核心组件及常用接口。消费者以group形式工作,包括leader和follower。交互流程涵盖FindCoordinator、JoinGroup、SyncGroup、拉取消息和退出过程。文章还探讨了broker的consumer group状态管理和rebalance原理。AutoMQ团队分享Kafka技术,感兴趣的话可以关注他们。
805 3
一文搞懂 Kafka consumer 与 broker 交互机制与原理
|
9月前
|
存储 消息中间件 缓存
Kafka(三)【Broker 存储】(2)
Kafka(三)【Broker 存储】