一文搞懂Kafka核心基础知识

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
日志服务 SLS,月写入数据量 50GB 1个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。Kafka 官网的下载地址是 https://kafka.apache.org/downloads ;打开下载页面后我们可以看 到不同版本的

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

「下面这篇文章会从以下方面会带大家介绍下这个强大的开源项目,希望对大家收获」


快速入门

版本介绍

Kafka 官网的下载地址是 https://kafka.apache.org/downloads ;打开下载页面后我们可以看 到不同版本的 Kafka 二进制代码压缩包

如图,当前最新的 Kafka 版本是 2.6.0,提供了两个二进制压缩包可供下载 。

  • katka_2.12 2.6.0.tgz
  • katka_2.13-2.6.0.tgz

上面两个文件中的 2.12 /2.13 分别表示编译 Kafka 的 Scala 语言版本,后面的 2.6.0 是 Kafka 的版本 。

其中前面的 2 表示大版本号,即 Major Version;中间的 6 表示小版本号或次版本号,即 Minor Version;最后的 0 表示修订版 本号,也就是 Patch 号。


安装教程

由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。

还有kafka也依赖zookeeper,所以需要先安装zookeeper

wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.5.8.tar.gz  
tar -zxvf zookeeper-3.5.8.tar.gz  
cd zookeeper-3.5.8.tar.gz  
# 启动zookeeper
bin/zkServer.sh start
bin/zkCli.sh 
ls /   #查看zk的根目录相关节点

「下载安装包」

下载并解压:

wget https://archive.apache.org/dist/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar -xzf kafka_2.12-2.6.0.tgz
cd kafka_2.12-2.6.0

「启动服务」

bin/kafka-server-start.sh -daemon config/server.properties
# 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
bin/zkCli.sh 
ls /  #查看zk的根目录kafka相关节点
ls /brokers/ids #查看kafka节点


消息引擎模型

「我们用一句话概括Kafka就是它是一款开 源的消息引擎系统。」

其中最常见的两种消息引擎模型是点对点模型和发布/订阅模型

「点对点模型」

点对点模型是基于队列提供消息传输服务的,该模型定义了消息队列、发送者和接收者 , 提供了一种点对点的消息传递方式,即发送者发送每条消息到队列的指定位置,接收者从指定位置获取消息,一旦消息被消费, 就会从队列中移除该消息 。 每条消息由一个发送者生产出来, 且只被一个消费者处理一一发送者和消费者之间是一对一的关系

「发布/订阅模型」

发布/订阅模型与前一种模型不同, 它有主题(topic)的概念。 这种模型也定义了类似于生产者/消费者这样的角色,即发布者和订阅者,发布者将消息生产出来发送到指定的topic中, 所有订阅了该 topic的订阅者都可以接收到该topic下的所有消息,通常具有相同订阅 topic 的所有订阅者将接收到 同样的消息

「Kafka 同时支持这两种消息引擎模型的,后面会介绍」


基本概念

消息

既然Kafka是消息引擎,这里的消息就是指 Kafka 处理的主要对象

Broker

broker 指一个 kafka 服务器。如果多个 broker 形成集群会依靠 Zookeeper 集群进行服务的协调管理。

生产者发送消息给 Kafka 服务器。消费者从 Kafka 服务器读取消息。

Topic和Partition

topic代表了一类消息, 也可以认为是消息被 发送到的地方。 通常我们可以使用 topic 来区分实际业务, 比如业务 A 使用 一个 topic , 业务 B 使用另外一个 topic。

Kafka 中的 topic 通常都会被多个消费 者订阅, 因此出于性能的考量 , Kafka 并不是 topic-message 的两级结构, 而是采用了 topic-partition-messa ge 的三级结构来分散负 载。 从本质上说, 每个 Kafka topic 都由若干个 partition 组成

如图: topic 是由多个 partition 组成的, 而 Kafka 的 partition 是不可修改的有序消 息序列, 也可以说是 有序的消息日志。 每个 partition 有自己专属的 partition 号, 通常是从 0 开始的。 用户对 partition 唯一能做的操作就是 在消息序列的尾部追 加写入消息。 partition 上的每条消息都会被分配一个唯一 的序列号

该序列号被称为位移( offset ) 是从 0 开始顺序递增 的整数。 位移信息可以 唯一定位到某 partition 下的一条消息 。

「kafka为什么要设计分区?」

解决伸缩性的问题。假如一个broker积累了太 多的数据以至于单台 Broker 机器都无法容纳了,此时应该怎么办呢?一个很自然的想法就 是,能否把数据分割成多份保存在不同的 Broker 上?所以kafka设计了分区

生产者和消费者

向主题发布消息的客户端应用程序称为生产者(Producer),生产者程序通常持续不断地 向一个或多个主题发送消息,而订阅这些主题消息的客户端应用程序就被称为消费者 (Consumer)。和生产者类似,消费者也能够同时订阅多个主题的消息

消费者组

Consumer Group 是指组里面有多个消费者或消费者实例,它 们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的 所有消费者协调在一起来消费订阅主题的所有分区(Partition)。当然,每个分区只能由 同一个消费者组内的一个 Consumer 实例来消费。

「Consumer Group 三个特性。」

  1. Consumer Group 下可以有一个或多个 Consumer 实 例。
  2. Group ID 是一个字符串,在一个 Kafka 集群中,它标识 唯一的一个 Consumer Group。
  3. Consumer Group 下所有实例订阅的主题的单个分区, 只能分配给组内的某个 Consumer 实例消费。这个分区 当然也可以被其他的 Group 消费。

「还记得上面提到的两种消息引擎模型」

Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引 擎系统的两大模型:如果所有实例都属于同一个 Group, 那么它实现的就是点对点模型;如果所有实例分别属于不 同的 Group,那么它实现的就是发布 / 订阅模型。

「在实际使用场景中,一个 Group 下该有多少个 Consumer 实例呢?」

理想情况下, Consumer 实例的数量应该等于该 Group 订阅主题的分区 总数。

举个简单的例子,假设一个 Consumer Group 订阅了 3 个 主题,分别是 A、B、C,它们的分区数依次是 1、2、3, 那么通常情况下,为该 Group 设置 6 个 Consumer 实例是 比较理想的情形,因为它能最大限度地实现高伸缩性。

消费顺序问题

按照上面的设计,可能会导致消费顺序问题,下面一一介绍

「乱序场景一」

因为一个topic可以有多个partition,kafka只能保证partition内部有序

当partition数量=同一个消费者组中消费者数量时,可能需要顺序的数据分布到了不同的partition,导致处理时乱序

「解决方案」

1、可以设置topic 有且只有一个partition

2、根据业务需要,需要顺序的 指定为同一个partition

「乱序场景二」

对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序

「解决方案」

消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据,根据key或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作

Rebalance

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

「Consumer Group 何时进行 Rebalance 呢? Rebalance 的触发条件有 3 个。」

  1. 组成员数发生变更。比如有新的 Consumer 实例加入组 或者离开组,或是有 Consumer 实例崩溃被“踢 出”组。
  1. 订阅主题数发生变更。Consumer Group 可以使用正则 表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile(“t.*c”)) 就表 明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主 题。在 Consumer Group 的运行过程中,你新创建了一 个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  1. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一 个主题的分区数。当分区数增加时,就会触发订阅该主题 的所有 Group 开启 Rebalance。

Rebalance 过程对 Consumer Group 消费过程有极 大的影响。会stop the world,简称 STW。我们知道在 STW 期间,所有应用线程都会停止工作,表现为 整个应用程序僵在那边一动不动。Rebalance 过程也和这个 类似,在 Rebalance 过程中,所有 Consumer 实例都会停 止消费,等待 Rebalance 完成。这是 Rebalance 为人诟病 的一个方面。

Offset

前面说过,topic partition 下的每条消息都被分配一个位移值。 实际上 ,Kafka消费者端也有位移( offset)的概念, 但一定要注意 这两个offset 属于不同的概念

显然, 每条消息在某个 partition 的位移是固定的, 但消费该 partition 的消费者的位移会随 着消费进度不断前移

Replica

既然我们己知 partition 是有序消息日志, 那么一定不能只保存这一份日志,否则一旦保存 partition 的 Kafka 服务器挂掉了, 其上保存的消息也就都丢失了。 分布式系统必然要实现高可靠性, 而目前实现的主要途径还是依靠冗余机制,通过备份多份日志 。 这些备份日志在 Kafka 中被称为副本( replica ),它们存在的唯一目的就是防止数据丢失

「副本分为两类 :」

领导者副本( leader replica )和追随者副本( follower replica )。

follower replica 是不能提供服务给客户端的,也就是说不负 责响应客户端发来的消息写入和消息消费请求。它只是被动地向领导者副本( leader replica )获取数据, 而 一旦 leader replica 所在的 broker 岩机, Kafka 会从剩余的 replica 中选举出新的 leader 继续提供服务。

Leader和Follower

前面说的, Kafka 的 replica 分为两个角色:领导者( leader )和追随者( follower ) 。 Kafka 保证同一个 partition 的多个 replica 一定不会分配在同一台 broker 上 。 毕竟如果同一个 broker 上有同一个 partition 的多个 replica, 那么将无法实现备份冗余的效果。

ISR

ISR 的全称是 in-sync replica,翻译过来就是与 leader replica 保持同步的 replica 集合 。

Kafka 为 partition 动态维护一个 replica 集合。该集合中的所有 replica 保存的消息日志都与leader replica 保持同步状态。只有这个集合中的 replica 才能被选举为 leader,也只有该集合中 所有 replica 都接收到了同一条消息, Kafka 才会将该消息置于“己提交”状态,即认为这条消 息发送成功。

如果因为各种各样的原因,一小部分 replica 开始落后于 leader replica 的进度 。当滞后 到 一定程度时, Kafka 会将这些 replica “踢”出 ISR。相反地,当这些 replica 重新“追上”了 leader 的进度时 , 那么 Kafka 会将它们加 回到 ISR 中。这一切都 是自动维护的, 不需要用户进行人工干预。

「最后用2张图来展示上面提到的这些概念以及运行流程:」


使用场景

日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。

消息系统:解耦和生产者和消费者、缓存消息等。

用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。

运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

下面是一个日志方面的典型使用场景。

KAFKA为什么快

「顺序读写」

kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能

顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写

「零拷贝」

服务器先将文件从复制到内核空间,再复制到用户空间,最后再复制到内核空间并通过网卡发送出去,而零拷贝则是直接从内核到内核再到网卡,省去了用户空间的复制

Zero copy对应的是Linux中sendfile函数,这个函数会接受一个offsize来确定从哪里开始读取。现实中,不可能将整个文件全部发给消费者,他通过消费者传递过来的偏移量来使用零拷贝读取指定内容的数据返回给消费者

「分区」

kafka中的topic中的内容可以被分为多分partition存在,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作的能力

「批量发送」

kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka

  1. 等消息条数到固定条数
  2. 一段时间发送一次

「数据压缩」

Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩。

压缩的好处就是减少传输的数据量,减轻对网络传输的压力。

Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个成本很值得


基本使用

Java客户端访问Kafka

下面介绍使用Java客户端访问Kafka

引入maven依赖

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.6.0</version>
</dependency>

消息发送端代码

public class MsgProducer {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.60:9092,192.168.0.60:9093,192.168.0.60:9094");
        /*
         发出消息持久化机制参数
        (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
        (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader
        又挂掉,则消息会丢失。
        (3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。
         这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
        */
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        //发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        //重试间隔设置
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
        //设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //kafka本地线程会从缓冲区取数据,批量发送到broker,
        //设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //默认值是0,意思就是消息必须立即被发送,但这样会影响性能
        //一般设置100毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果100毫秒内,这个batch满了16kb就会随batch一起被发送出去
        //如果100毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        //把发送的key从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        Producer<String, String> producer = new KafkaProducer<>(props);
        int msgNum = 5;
        for (int i = 1; i <= msgNum; i++) {
            Order order = new Order(i, 100 + i, 1, 1000.00);
            //指定发送分区
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("order-topic"
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));
            //未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
            /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("my-replicated-topic"
                    , order.getOrderId().toString(), JSON.toJSONString(order));*/
            //等待消息发送成功的同步阻塞方法
         /*RecordMetadata metadata = producer.send(producerRecord).get();
         System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                 + metadata.partition() + "|offset-" + metadata.offset());*/
            //异步方式发送消息
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("发送消息失败:" + exception.getStackTrace());
                    }
                    if (metadata != null) {
                        System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                                + metadata.partition() + "|offset-" + metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}

消息接收端代码

public class MsgConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.60:9092,192.168.0.60:9093,192.168.0.60:9094");
        // 消费分组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
        // 是否自动提交offset
      //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      // 自动提交offset的间隔时间
      props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
      /*
      心跳时间,服务端broker通过心跳确认consumer是否故障,如果发现故障,就会通过心跳下发
      rebalance的指令给其他的consumer通知他们进行rebalance操作,这个时间可以稍微短一点
      */
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        //服务端broker多久感知不到一个consumer心跳就认为他故障了,默认是10秒
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
        /*
        如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
        会将其踢出消费组,将分区分配给别的consumer消费
        */
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 消费主题
        String topicName = "order-topic";
        //consumer.subscribe(Arrays.asList(topicName));
        // 消费指定分区
        //consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
        //消息回溯消费
        consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
        consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));
        //指定offset消费
        //consumer.seek(new TopicPartition(topicName, 0), 10);
        while (true) {
            /*
             * poll() API 是拉取消息的长轮询,主要是判断consumer是否还活着,只要我们持续调用poll(),
             * 消费者就会存活在自己所在的group中,并且持续的消费指定partition的消息。
             * 底层是这么做的:消费者向server持续发送心跳,如果一个时间段(session.
             * timeout.ms)consumer挂掉或是不能发送心跳,这个消费者会被认为是挂掉了,
             * 这个Partition也会被重新分配给其他consumer
             */
            ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("收到消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
                        record.value());
            }
            if (records.count() > 0) {
                // 提交offset
                consumer.commitSync();
            }
        }
    }
}

Spring Boot整合Kafka

引入spring boot kafka依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml配置如下:

server:
  port: 8080
spring:
  kafka:
    bootstrap-servers: 192.168.0.60:9092,192.168.0.60:9093
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: mygroup
      enable-auto-commit: true

发送者代码:

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @RequestMapping("/send")
    public void send() {
        kafkaTemplate.send("mytopic", 0, "key", "this is a msg");
    }
}

消费者代码:

@Component
public class MyConsumer {
    /**
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     *             @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     *             @TopicPartition(topic = "topic2", partitions = "0",
     *                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     *     },concurrency = "6")
     *  //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
     * @param record
     */
    @KafkaListener(topics = "mytopic",groupId = "testGroup")
    public void listen(ConsumerRecord<String, String> record) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
    }
}


常用参数配置

Broker端参数

log.dirs:指定了 Broker 需要 使用的若干个文件目录路径。

auto.create.topics.enable:是否允许自动创建 Topic。

unclean.leader.election.enable:是否允许 Unclean Leader 选举。

「Unclean 领导者选举(Unclean Leader Election)」

既然 ISR 是可以动态调整的,那么自然就可以出现这样的情形:ISR 为空。因为 Leader 副 本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新 选举一个新的 Leader。可是 ISR 是空,此时该怎么选举新 Leader 呢?

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟, 这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过 程称为 Unclean 领导者选举。

开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直 存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举 的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。

auto.leader.rebalance.enable:是否允许定期进 行 Leader 选举。

auto.create.topics.enable:是否允许自动创建 Topic。

log.retention.hourminutes|ms:都是控制一条消息数据被保存多长时间。从优先 级上来说 ms 设置最高、minutes 次之、hour 最低。

log.retention.bytes:这是指定 Broker 为消息保存 的总磁盘容量大小。

message.max.bytes:控制 Broker 能够接收的最大消 息大小。

Topic级别参数

Topic 级别参数会覆盖全局 Broker 参数的值,而每个 Topic 都能设置自己的参数值

retention.ms:规定了该 Topic 消息被保存的时长。 默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦 设置了这个值,它会覆盖掉 Broker 端的全局参数值。

retention.bytes:规定了要为该 Topic 预留多大的磁 盘空间。和全局参数作用相似,这个值通常在多租户的 Kafka 集群中会有用武之地。当前默认值是 -1,表示可以 无限使用磁盘空间。


总结

本文是KAFKA相关的最核心基础的知识,基本可以带大家入门了,当然,Kafka还有很多高级特性,如幂等,事务,压缩,流处理等,以及常见的消息丢失,重复,堆积等问题的解决方案和配置,因为篇幅有限,今后会陆续编写相应的文章讲解


「觉得不错,记得点赞,转发,分享,谢谢」


最后

微信搜索:月伴飞鱼

1.每天分享一篇实用的技术文章,对面试,工作都有帮助

2.后台回复666,获得海量免费电子书籍,会持续更新

文章参考:

https://kafka.apache.org/

Apache Kafka实战(胡夕)

相关文章
|
6月前
|
消息中间件 存储 缓存
Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
258 2
|
6月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
214 2
|
6月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
131 1
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
46 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
269 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
67 3
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
135 0
|
3月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。