kafka几道面试题

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
日志服务 SLS,月写入数据量 50GB 1个月
简介: kafka几道面试题

kafka如何保证数据不丢失

broker端:

Topic 副本因子个数:replication.factor >= 3
同步副本列表(ISR):min.insync.replicas = 2
禁用unclean选举:unclean.leader.election.enable=false

副本因子

Kafka的分区副本包含两种类型:Leader Replica和Follower Replica

所有读写请求都必须发往Leader副本所在的Broker,由该 Broker 负责处理。Follower副本不处理客户端请求,唯一任务是从Leader副本异步拉取消息,并写入到自己的提交日志中,从而实现与Leader副本的同步。

实际生产环境中一般会在可用性和存储硬件之间作出权衡。

副本的分布同样也会影响可用性。默认情况下,Kafka会确保分区的每个副本分布在不同的Broker上,但是如果这些Broker在同一个机器上,一旦机器的交换机发生故障,分区将不可用。所以建议把Broker分布在不同的机器上,可以使用broker.rack参数配置Broker所在机器的名称。
同步副本列表

In-sync replica(ISR)称为同步副本,ISR中的副本都是与Leader副本数据状态同步的副本。

ISR存在哪些副本呢?Leader副本总是存在于ISR中。以及与Leader副本保持了“同步”的follower副本。Kafka的broker端有一个参数replica.lag.time.max.ms, 该参数表示follower副本滞后于Leader副本的最长时间间隔,默认是10秒。意味着只要follower副本落后于leader副本的时间间隔不超过10秒,就可以认为该follower副本与leader副本是同步的。

可以看出ISR是一个动态的。所以即便是为分区配置了3个副本,还是会出现同步副本列表中只有一个副本的情况(其他副本由于不能够与leader及时保持同步,被移出ISR列表)。如果这个同步副本变为不可用,我们必须在可用性和一致性之间作出选择(CAP理论)。

根据Kafka对可靠性的定义,消息只有在写入所有同步副本之后才被认为是已提交的。但如果这里的“所有同步副本”只包含一个同步副本,那么在这个副本变为不可用时,数据就会丢失。(某副本宕机后,没有副本保存原有数据状态。)

如果要确保已提交的数据被写入不止一个副本,就需要把最小同步副本数量设置为大一点的值。对于一个包含3 个副本的主题分区,如果min.insync.replicas=2,那么至少要存在两个同步副本才能向分区写入数据。(满足这个条件,生产者才能将生产的数据放入消息队列中)

举例:如果进行了上面的配置,此时必须要保证ISR中至少存在两个副本,如果ISR中的副本个数小于2,那么Broker就会停止接受生产者的请求。尝试发送数据的生产者会收到NotEnoughReplicasException异常,消费者仍然可以继续读取已有的数据。

禁用unclean选举

选择一个同步副本列表中的分区作为leader 分区的过程称为clean leader election。注意,这里要与在非同步副本中选一个分区作为leader分区的过程区分开,在非同步副本中选一个分区作为leader的过程称之为unclean leader election。由于ISR是动态调整的,所以会存在ISR列表为空的情况,通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程可以通过Broker 端参数 unclean.leader.election.enable控制是否允许 Unclean 领导者选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean Leader 选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。分布式系统的CAP理论说的就是这种情况。

不幸的是,unclean leader election的选举过程仍可能会造成数据的不一致,因为同步副本并不是完全同步的。由于复制是异步完成的,因此无法保证follower可以获取最新消息。比如Leader分区的最后一条消息的offset是100,此时副本的offset可能不是100,这受到两个参数的影响:

replica.lag.time.max.ms:同步副本滞后leader副本的时间

zookeeper.session.timeout.ms:与zookeeper会话超时时间

简而言之,如果我们允许不同步的副本成为leader,那么就要承担丢失数据和出现数据不一致的风险。如果不允许它们成为leader,那么就要接受较低的可用性,因为我们必须等待原先的Leader恢复到可用状态。(高可用性和数据一致性的平衡)

关于unclean选举,不同的场景有不同的配置方式。对数据质量和数据一致性要求较高的系统会禁用这种unclean的leader选举(比如银行)。如果在可用性要求较高的系统里,比如实时点击流分析系统, 一般不会禁用unclean的leader选举。


producer端:
同步方式下配置:

同步方式下配置:
producer.type=sync
request.required.acks=1
副本数量>=2
增加重试次数

异步方式配置:

producer.type=async 
request.required.acks=1 
queue.buffering.max.ms=5000 
queue.buffering.max.messages=10000 
queue.enqueue.timeout.ms = -1 
batch.num.messages=200
queue.buffering.max.ms=5000
通过buffer来进行控制数据的发送,有两个值来进行控制,缓冲时间阈值与缓冲消息的数量阈值,
如果buffer满了数据还没有发送出去,如果设置的是立即清理模式,风险很大,一定要设置为阻塞模式

consumner端:

1:关闭自动 offset,手动提交offset
设置 enable.auto.commit = false , 默认值true,自动提交
使用kafka的Consumer的类,用方法consumer.commitSync()提交
或者使用spring-kafka的 Acknowledgment类,用方法ack.acknowledge()提交(推荐使用)
2:另一个方法同样需要手动commit offset,另外在consumer端再将所有fetch到的数据缓存到queue里,
当把queue里所有的数据处理完之后,再批量提交offset,这样就能保证只有处理完的数据才被commit。

kafka如何保证数据exactly-once

1) Producer exactly-once
enable.idempotence=true  
分区副本数>= 2
isr >=2
ProducerID+SequenceNumber+Ack=-1(幂等性)
2)Consumer exactly-once
手动维护并提交偏移量。
1:设置enable.auto.commit=false,关闭自动提交偏移量
2:借助外部数据库,如redis的pipeline,mysql的事务机制管理存储偏移量
再同一事物中,在消息被处理完之后在提交偏移量。并更新偏移量。
否则消息需回滚,并获取到上一次偏移量的位置从新进行处理。

kafka发送数据效率低或者超时处理

常见优化方法:

 

1:当然最简单就是增加broker节点,增加分区数量,提高并行度,

2:多线程的方式写入

3:静态配置:

GC相关:

     Java 7引进了G1 垃圾回收,使得GC调优变的没那么难。G1主要有两个配置选项来调优:MaxGCPauseMillis 和 InitiatingHeapOccupancyPercent. Kafka的启动脚本不支持G1算法,需要在环境变量中设置:

export KAFKA_HEAP_OPTS="-Xmx8G -Xms8G -Xmn4G -XX:PermSize=64m -XX:MaxPermSize=128m  -XX:SurvivorRatio=6  -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseCMSInitiatingOccupancyOnly"


OS相关:

1. 因为Kafka数据持久化到硬盘,所以理论上增加磁盘数量,可以增加吞吐量;

     server.properties 中的log.dirs来配置多个硬盘,多个存储路径;

2. 增大socket buffer size:

      Enabling High Performance Data Transfers

      默认值,可以改成4倍大小。

# cat /proc/sys/net/core/rmem_max 
124928
# cat /proc/sys/net/core/wmem_max 
124928

3. 文件描述符:Kafka会使用大量文件和网络socket,所以,我们需要把file descriptors的默认配置改为100000。

#vi /etc/sysctl.conf
fs.file-max = 32000
#vi /etc/security/limits.conf
yourusersoftnofile10000
youruserhardnofile30000

4:动态配置:

unclean.leader.election.enable:不严格的leader选举,有助于集群健壮,但是存在数据丢失风险。

max.message.bytes:单条消息的最大长度。如果修改了该值,那么replica.fetch.max.bytes和消费者的fetch.message.max.bytes也要跟着修改。

cleanup.policy:生命周期终结数据的处理,默认删除。

flush.messages:强制刷新写入的最大缓存消息数。

flust.ms:强制刷新写入的最大等待时长。
min.insync.replicas:如果同步状态的副本小于该值,服务器将不再接受request.required.acks为-1或all的写入请求
ack=1:相较于-1,延时更低,但数据有肯能重复

batch.size:批次大小,默认16k (即每次消息攒够一批的数量大小)

linger.ms:等待时间,修改为5-100ms (如果不设置该值,即消息来了就发送出去)

compression.type:压缩snappy (消息积攒后在发送到broker时采用的压缩算法)

RecordAccumulator:缓冲区大小,修改为64m

import org.apache.kafka.clients.producer.*; 
import java.util.Properties; 
/**
 * 设置核心的参数,提升发送消息的效率
 */
public class ProducerCustomParams {
    public static void main(String[] args) throws Exception {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        System.out.println("开始发送数据");
        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 100; i++) {
            kafkaProducer.send(new ProducerRecord<>("zcy222", "congge " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e)
 {
                    if (e == null) {
                        System.out.println("发送成功");
                        System.out.println("主题:" + metadata.topic());
                        System.out.println("分区:" + metadata.partition());
                    }
                }
            });
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

kakfa数据积压问题解决

1、增加broker节点,增加分区数量,提高并行度
2、修改单个消费为批量消费
3、增加单线程消费为线程池异步消费
4、缩短批次时间间隔;
5、老版本SparkStreaming控制消费的速率(spark.streaming.kafka.maxRatePerPartition),可以控制最大的消费速率,在参数中设置;新版本设置背压机制实现消费处理的动态平衡。
6.对代码进行优化,尽可能的一次性计算多个结果,减少shuffer过程;
7.处理的结果如果过多,可以将数据保存到MySQL集群、MongoDB集群【支持事物】或ES【不支持事物】,增大吞吐量
8、消费线程将拉取的消息放到一个滑动窗口中,通过滑动窗口控制拉取的速度
9、对于倾斜的key加以处理,加随机数扽等方式打散
参考:http://niyanchun.com/kafka-multi-thread-consumer.html

kafka reblance机制

reblance触发时机?

    当出现以下几种情况时,Kafka 会进行一次重新分区分配操作,即 Kafka 消费者端的 Rebalance 操作

    ① 同一个 consumer 消费者组 group.id 中,新增了消费者进来,会执行 Rebalance 操作

    ② 消费者离开当期所属的 consumer group组。比如 主动停机  或者  宕机

    ③ 分区数量发生变化时(即 topic 的分区数量发生变化时)

    ④ 消费者主动取消订阅


reblance三种策略?

kafka新版本提供了三种rebalance分区分配策略:(partition.assignment.strategy)

  range

   round-robin

    sticky


range 分配策略的原理是按消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能平均的分配给所有的消费者。

假设 n = 分区数/消费者数量,m= 分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。

缺点:容易产生数据倾斜

round-robin分配策略即轮询机。把所有的partition和所有的consumer都列出来,然后按hashcode进行排序,通过轮询算法来分配partition给到各个消费者。

比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、8给一个consumer


sticky策略初始时分配策略与round-robin类似,但是在rebalance的时候,需要保证如下两个原则。

1)分区的分配要尽可能均匀 。

2)分区的分配尽可能与上次分配的保持相同。


 在企业中  如果 topic少选择  range + 粘性;topic多,选择  roundbin + 粘性再平衡时  粘性有用。

粘性分区是Kafka0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。


消费者组的几种状态?

协调者在管理消费者组成员的时候,会对消费者组标定状态。在协调者的视角,消费者组一共有 5 种状态:
● Empty:表示组内没有任何成员,但可能存在已经提交的位移数据,而且这些数据还没有过期。
● Dead:表示组内没有任何成员,而且元信息已经被移除。
● PreparingRebalance:表示准备开始 Rebalance,所有消费成员都需要重新向协调者请求加入消费者组。
● CompletingRebalance:表示所有的成员已经重新加入消费者组,正在等待分配方案。
● Stable:表示稳定状态,也就是完成 Rebalance 后可以正常消费数据的状态。

reblance 5种协议?

JoinGroup请求:consumer请求加入组
SyncGroup请求:group leader把分配方案同步更新到组内所有成员中。
Heartbeat请求:consumer定期向coordinator汇报心跳表明自己依然存活。
LeaveGroup请求:consumer主动通知coordinator自己将要离开consumergroup。
DescribeGroup请求:查看组的所有的所有信息,包括成员信息、协议信息、分配方案、以及订阅信息等。该请求主要供管理员使用,coordinator不使用该请求实现rebalance。

reblance简易流程?

60a6bcefe26f4b118e50f46e4d0afd1d.png

组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。

consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest 请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。

协调者,也叫做 Coordinator,它是一个专门为消费者群提供服务的角色,主要负责 Rebalance 的执行、位移管理、组成员管理等。

 

第一阶段:选择组协调器

consumer group在执行rebalance之前必须首先确认coordinator在哪个broker上。并创建与该broker通信的socket连接。确定 coordinator 的算法与确定 offset 被提交到consumer offsets 目标分区的算法是相同的 算法如下:


计算 Math.abs(groupID .hash Code) % offsets. topic.num. partitions 参数值(默认是 50) ,假设是 10

寻找__consumer_offsets 分区 10 leader 副本所在的 broker ,该 broker 即为这group的coordinator

成功连接 coordinator 之后便可以执行 rebalance 操作, 目前 rebalance 主要分为两步:加入组和同步更新分配方


第二阶段:加入消费组JOIN GROUP

在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。然后GroupCoordinator 从一个consumer group中选择第一个加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。


第三阶段( SYNC GROUP)

consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。                


reblance所涉及到的一些参数?

session.timeout.ms

该参数是 Coordinator 检测消费者失败的时间,即在这段时间内客户端是否跟 Coordinator 保持心跳,如果该参数设置数值小,可以更早发现消费者崩溃的信息,从而更快地开启重平衡,避免消费滞后,但是这也会导致频繁重平衡,这要根据实际业务来衡量。

max.poll.interval.ms

消费者处理消息逻辑的最大时间,对于某些业务来说,处理消息可能需要很长时间,比如需要 1分钟,那么该参数就需要设置成大于 1分钟的值,否则就会被 Coordinator 剔除消息组然后重平衡。

heartbeat.interval.ms

该参数跟 session.timeout.ms 紧密关联,前面也说过,只要在 session.timeout.ms 时间内与 Coordinator 保持心跳,就不会被 Coordinator 剔除,那么心跳间隔的时间就是 session.timeout.ms,因此,该参数值必须小于 session.timeout.ms,以保持 session.timeout.ms 时间内有心跳。

kafka性能优化

broker参数调优

num.recovery.threads.per.data.dir
对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段:
    器正常启动,用于打开每个分区的日志片段;
    服务器崩溃后重启,用于检查和截短每个分区的日志片段;
    服务器正常关闭,用于关闭日志片段。
默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。
设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。
也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且且log.dir 指定了 3 个路径,那么总共需要 24 个线程。
num.partitions
设置合理的分区数,分区数=Tt /min(Tp,Tc),建议设置3~10个分区
log.retention.ms
Kafka 通常根据时间来决定数据可以被保留多久。默认使用 log.retention.hours 参数来配置时间,默认值为 168 小时,也就是一周。除此以外,还有其他两个参数 log.retention.minutes 和 log.retention.ms 。
这 3 个参数的作用是一样的,都是决定消息多久以后会被删除,不过还是推荐使用 log.retention.ms 。
如果指定了不止一个参数,Kafka 会优先使用具有最小值的那个参数。
log.retention.bytes
另一种方式是通过保留的消息字节数来判断消息是否过期。它的值通过参数log.retention.bytes 来指定,作用在每一个分区上。
也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设为 1GB,那么这个主题最多可以保留 8GB 的数据。
所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。
根据字节大小和时间保留数据
如果同时指定了 log.retention.bytes 和 log.retention.ms (或者另一个时间参数),只要任意一个条件得到满足,消息就会被删除。
例如,假设 log.retention.ms 设置为 86 400 000(也就是 1 天),log.retention.bytes 设置为 1 000 000 000(也就是 1GB),
如果消息字节总数在不到一天的时间就超过了 1GB,那么多出来的部分就会被删除。相反,如果消息字节总数小于 1GB,那么一天之后这些消息也会被删除,尽管分区的数据总量小于 1GB。
log.segment.bytes
此设置作用在作用在日志片段上。当日志片段大小达到 log.segment.bytes 指定的上限(默认是 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。
这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。
log.segment.ms
此设置指定了多长时间之后日志片段会被关闭。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。
message.max.bytes
此设置用于设置 message.max.bytes 参数来限制单个消息的大小,默认值是 10000 000,也就是 1MB
num.network.threads
创建 Processor 处理网络请求线程个数,建议设置为 Broker 当前CPU核心数*2,这个值太低经常出现网络空闲太低而缺失副本。
num.io.threads
创建 KafkaRequestHandler 处理具体请求线程个数,建议设置为Broker磁盘个数*2。
num.replica.fetchers
建议设置为CPU核心数/4,适当提高可以提升CPU利用率及 Follower同步 Leader数据当并行度
compression.type
建议采用lz4压缩类型,压缩可以提升CPU利用率同时可以减少网络传输数据量。
log.flush.xxx
log.flush.scheduler.interval.ms
log.flush.interval.ms
log.flush.interval.messages
这几个参数表示日志数据刷新到磁盘的策略,应该保持默认配置,刷盘策略让操作系统去完成,由操作系统来决定什么时候把数据刷盘;如果设置来这个参数,可能对吞吐量影响非常大
auto.leader.rebalance.enable
表示是否开启leader自动负载均衡,默认true;我们应该把这个参数设置为false,因为自动负载均衡不可控,可能影响集群性能和稳定。

producer参数调优

ack
kakfa消息语义。
buffer.memory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
这个时候, send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer.full 参数(在 0.9.0.0 版本里被替换成了 max.block.ms ,表示在抛出异常之前可以阻塞一段时间)。
compression.type
默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy 、gzip 或 lz4。
snappy 压缩算法占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。
gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
retries
消息重试机制。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。
batch.size
批量发送消息大小。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。
linger.ms
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。
默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。
max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。
把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,
metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。
max.block.ms
该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。
当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。
在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
max.request.size
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。
receive.buffer.bytes 和 send.buffer.by
这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

consumer端参数调优

fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。
如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。
fetch.max.wait.ms
指定 broker 的等待时间,默认是 500ms。如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。
如果 fetch.max.wait.ms 被设为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,
要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。
max.partition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。
在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,
消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。
session.timeout.ms
该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。
该属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。
所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest 
enable.auto.commit
该属性指定了消费者是否自动提交偏移量,默认值是 true 。为了尽量避免出现重复数据和数据丢失,可以把它设为 false ,由自己控制何时提交偏移量。
如果把它设为 true ,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。
partition.assignment.strategy
通过设置partition.assignment.strategy 来选择分区策略,有Range,RoundRobin,sticky三种策略。
max.poll.records
该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
eceive.buffer.bytes 和 send.buffer.bytes
socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,
可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

参考链接:http://t.zoukankan.com/luckyhui28-p-12001833.html

kafka 分区数设置

kafka分区数设置?

可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 =  Tt / min(Tp, Tc)


说明:Tp表示producer的吞吐量。测试producer通常是很容易的,因为它的逻辑非常简单,就是直接发送消息到Kafka就好了。Tc表示consumer的吞吐量。测试Tc通常与应用的关系更大, 因为Tc的值取决于你拿到消息之后执行什么操作,因此Tc的测试通常也要麻烦一些。


数据发往kafka分区规则?

key 和 value 的类型,一般都用字符串即可。 数据到底写入到哪一个分区中:

   如果指定了分区,就写入到指定的分区中

   如果没有指定分区,指定了 key,按照 key 的 hashcode,取模,写入对应的分区

   如果没有指定分区和 key,轮询机制

kafka producer buffer pool

Kafka通过使用内存缓冲池的设计,让整个发送过程中的存储空间循环利用,有效减少JVM GC造成的影响,从而提高发送性能,提升吞吐量。


参考:https://blog.51cto.com/u_15127701/2870737
         https://mp.weixin.qq.com/s/P6BO5KoMl_NQAI_OcwnXrQ

kafka 时间轮

kakfa通过时间轮来处理延迟任务,只将时间轮的槽保存到延迟队列,大大的减少了延迟队列的元素数量,这样对于元素的增加删除性能有很大提高;
kafka通过阻塞的方式poll延迟队列的,减少了大量的空转;
为了保证线程安全,灵活运用读写锁、原子对象、synchronized控制时间轮的操作;

链接:https://mp.weixin.qq.com/s/797CTDY4VXIcoEWMU0zL0w

kafka为什么这么快?

60a6bcefe26f4b118e50f46e4d0afd1d.png










相关文章
|
2月前
|
消息中间件 存储 负载均衡
Kafka面试题及答案
Kafka面试题及答案
|
2月前
|
消息中间件 算法 Java
面试官:Kafka中的key有什么用?
面试官:Kafka中的key有什么用?
45 3
面试官:Kafka中的key有什么用?
|
3月前
|
消息中间件 算法 Kafka
面试题Kafka问题之Kafka的副本消息同步如何解决
面试题Kafka问题之Kafka的副本消息同步如何解决
58 4
|
3月前
|
消息中间件 存储 缓存
面试题Kafka问题之Kafka的生产消费基本流程如何解决
面试题Kafka问题之Kafka的生产消费基本流程如何解决
37 1
|
3月前
|
消息中间件 Kafka
面试题Kafka问题之Kafka【线上】积压消费如何解决
面试题Kafka问题之Kafka【线上】积压消费如何解决
23 0
|
3月前
|
消息中间件 算法 NoSQL
面试题Kafka问题之Kafka保证系统的可用性如何解决
面试题Kafka问题之Kafka保证系统的可用性如何解决
33 0
|
3月前
|
消息中间件 Kafka 数据库
面试题Kafka问题之查看偏移量为23的消息如何解决
面试题Kafka问题之查看偏移量为23的消息如何解决
27 0
|
3月前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
49 0
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
86 9
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
59 3
下一篇
无影云桌面