【Kafka从入门到成神系列 三】Kafka 生产者消息分区及压缩算法

简介: 【Kafka从入门到成神系列 三】Kafka 生产者消息分区及压缩算法

、生产者消息分区机制

当我们在使用 Kafka 时,我们肯定希望将数据均匀的分配到所有服务器上。这样,我们的负载均衡就变的及其完美。

1. 分区原因

简单来说,Kafka 的消息组织方式结构:主题 - 分区 - 副本 - 消息

一条消息,只能保存到一个分区内,不会在多个分区保存多份。

简单想想,为什么我们的 Kafka 已经有 Topic 了,还需要做一个分区出来呢?

主要的原因在于:实现系统的高伸缩性,不同的分区能够放置到不同节点的机器上,我们可以通过添加机器增加整体系统的吞吐量

当然,这里也可以使用 AKF 来进行解释:

AKF 立方体也叫做scala cube,它在《The Art of Scalability》一书中被首次提出,旨在提供一个系统化的扩展思路。

AKF 把系统扩展分为以下三个维度:

  • X 轴:直接水平复制应用进程来扩展系统。
  • Y 轴:将功能拆分出来扩展系统。
  • Z 轴:基于用户信息扩展系统。

与我们 Kafka 相对应:

  • X 轴:使用可靠的副本机制
  • Y 轴:不同的功能拆分—Topic
  • Z轴:不同的用户消费数据—partition

分区的概念很早之前就已经引进了,比如:

  • MongoDBElasticsearch 叫做分片 Shard
  • HBase 叫做 Region
  • Cassandra 叫做 vnode

. 分区策略

Kafka 的分区策略一般为 轮询、随机、按 key 值。当然,我们也可以自定义分区策略。

2.1 轮询策略

顺序分配。比如一个主题下面有 3 个分区,消息分配的格式如下:

Kafka 默认是轮询策略。轮询策略有非常优秀的负载均衡表现,总能保证消息最大限度的平均分配到所有分区上。

2.2 随机策略

随机的将我们的消息放置到任意一个分区上。

实现方式:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 根据分区的大小,随机生成
return ThreadLocalRandom.current().nextInt(partitions.size());

如果想追求数据的均匀分布,还是使用轮询策略比较好。

2.3 按消息键保存策略

kafka 允许为每套消息定义消息键,简称 Key

这个 key 可以是一个有明确业务含义的字符串,如客户代码、部门编号、业务ID等,我们可以将相同的 key 分到一个分区。

实现方式:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 相同的hash分到一个分区
return Math.abs(key.hashCode()) % partitions.size();

2.4 其他分区策略

如果上述三种策略不能支撑你的业务发展,那么你可以尝试自定义你自己的策略。

我们需要实现 public interface Partitioner extends Configurable, Closeable {} 该接口,定义自己的分区策略

public class UserDefinePartitioner implements Partitioner {
    private AtomicInteger counter = new AtomicInteger(0);
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取该Topic下所有的分区数
        List<PartitionInfo> partitioners = cluster.partitionsForTopic(topic);
        int numPartitioners = partitioners.size();
        if (keyBytes == null) {
            // 原子性增加一
            int addIncrement = counter.getAndIncrement();
            // 取模
            return addIncrement % numPartitioners;
        } else {
            // 对keyBytes进行Hash然后取模
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitioners;
        }
    }
    @Override
    public void close() {
        System.out.println("close");
    }
    @Override
    public void configure(Map<String, ?> map) {
        System.out.println("configure");
    }
}

2.5 案例展示

目前我们有一批数据,这一批数据有因果关系,所以我们处理这一批数据必须要保持其 有序性。否则我们先处理了果,再处理因,肯定会出现业务问题。

我们使用最简单的方法,建立一个 Topic,Topic 只含有一个分区,将我们的因和果发到 Topic,实现顺序性。这样做虽然实现了顺序性,但是丧失了 Kafka 带来的高吞吐了和负载均衡能力。

我们将我们因的 key 设置为 because将果的key 设置成 result,重写我们的分配策略 return Math.abs(key.hashCode()) % partitions.size();。这样,我们的生产者会将因和果分别发送到不同的分区,不同的消费者指定消费不同的分区consumer.assign()),实现相应的业务逻辑。

二、生产者压缩算法

压缩(comparession),用时间换取空间的经典 trade-off 思想,具体来说,就是用 CPU 时间去换取磁盘空间或网络 I/O 传输量,希望以较小的 CPU 开销带来更少的磁盘占用或更少的网络 I/O 传输。

1. 怎样压缩

kafka 有两个类消息格式,分别为 v1 和 v2 版本。

Kafka 的消息层次分为两次:消息集合(message set)以及消息(message)。一个消息集合包含多个日志项(record item),日志项是真正封装消息的地方。

v1:message set、message

v2:record batch、record

我们的 v2 版本将消息的公共部分抽取出来放到了外层消息集合里面,这样不需要每一条消息都保存这些数据了。


我们原来的 V1 版本中,每条消息都需要执行 CRC 校验,但有些情况下消息的 CRC 值是会变化的。比如在 Broker 端可能对时间戳字段进行更新或执行消息格式转化更新 CRC 值,所以,我们没条消息都执行 CRC 检验没必要,不仅浪费空间还浪费时间。因此在 V2 版本中,消息的 CRC 校验工作就被移动到了消息集合这一层。


V1 版本保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息字段中;V2 版本的做法是对整个消息集合进行压缩。


性能测试:

9e6efdc45bf02b5edb58846c9fe3ed3e.png

2. 何时压缩

压缩可能发生的地方:生产者端、Broker 端

生产者可配置 props.put("compression.type", "gzip"); 即可开启 gzip 压缩

在生产者端启用压缩是很自然的想法,那为什么我说在 Broker 端也可能进行压缩呢?

大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存。有两种例外情况就可能让 Broker 重新压缩消息。

Broker 指定了和 Producer 端不同的压缩算法

这是是生产者和 Broker 的关联。一旦在 Broker 端设置了不同的 compression.type 值,就一定要小心了,因为可能会发生预料之外的压缩 / 解压缩操作,通常表现为 Broker 端 CPU 使用率飙升。

Broker 端发生了消息格式转换。一般是 V1 和 V2 消息版本转换带来的。会使其丧失零拷贝的功效。

这里指的是消费者去 Broker 消费消息时,我们 Broker 消息版本和消费者的消息版本不一致,需要从内核态切换到用户态,将消息进行格式转换,随后发给我们的消费者。

3. 何时解压缩

解压缩通常发生在消费者。一般情况下,我们会将压缩算法封装到消息集合中,消费者通过压缩算法进行解压缩。

Producer 端压缩、Broker 端保持、Consumer 端解压缩。

Broker 也会进行解压缩,每个压缩过的消息集合在 Broker 端写入时,都要发生解压缩操作,目的是为了对消息执行各种验证。但这种操作对CPU性能严重降低。

国内京东的小伙伴们刚刚向社区提出了一个 bugfix,建议去掉因为做消息校验而引入的解压缩。据他们称,去掉了解压缩之后,Broker 端的 CPU 使用率至少降低了 50%。

4. 压缩算法对比

下面这张表是 Facebook Zstandard 官网提供的一份压缩算法 benchmark 比较结果:

  • 吞吐量方面:LZ4 > Snappy > zstd 和 GZIP
  • 压缩比方面:zstd > LZ4 > GZIP > Snappy
  • 网络带宽:使用 Snappy 算法占用的网络带宽最多,zstd 最少
  • CPU使用率:压缩时 Snappy 算法使用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能使用更多的 CPU。

当你的CPU资源充足、环境带宽有限时,我建议你开启压缩。毕竟带宽比 CPU 更稀缺。压缩算法一般选择 zstd









相关文章
|
5月前
|
消息中间件 Linux Kafka
linux命令使用消费kafka的生产者、消费者
linux命令使用消费kafka的生产者、消费者
245 16
|
8月前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
188 61
|
7月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
265 10
|
7月前
|
机器学习/深度学习 算法 机器人
强化学习:时间差分(TD)(SARSA算法和Q-Learning算法)(看不懂算我输专栏)——手把手教你入门强化学习(六)
本文介绍了时间差分法(TD)中的两种经典算法:SARSA和Q-Learning。二者均为无模型强化学习方法,通过与环境交互估算动作价值函数。SARSA是On-Policy算法,采用ε-greedy策略进行动作选择和评估;而Q-Learning为Off-Policy算法,评估时选取下一状态中估值最大的动作。相比动态规划和蒙特卡洛方法,TD算法结合了自举更新与样本更新的优势,实现边行动边学习。文章通过生动的例子解释了两者的差异,并提供了伪代码帮助理解。
434 2
|
8月前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的消息发送方式
Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。
295 5
|
10月前
|
存储 人工智能 自然语言处理
Delta-CoMe:清华联合OpenBMB等高校开源的新型增量压缩算法
Delta-CoMe是由清华大学NLP实验室联合OpenBMB开源社区、北京大学和上海财经大学提出的新型增量压缩算法。该算法通过结合低秩分解和低比特量化技术,显著减少了大型语言模型的存储和内存需求,同时保持了模型性能几乎无损。Delta-CoMe特别适用于处理数学、代码和多模态等复杂任务,并在推理速度上有所提升。
266 6
Delta-CoMe:清华联合OpenBMB等高校开源的新型增量压缩算法
|
11月前
|
消息中间件 负载均衡 Kafka
【赵渝强老师】Kafka的主题与分区
Kafka 中的消息按主题分类,生产者发送消息到特定主题,消费者订阅主题消费。主题可分多个分区,每个分区仅属一个主题。消息追加到分区时,Broker 分配唯一偏移量地址,确保消息在分区内的顺序性。Kafka 保证分区有序而非主题有序。示例中,Topic A 有 3 个分区,分区可分布于不同 Broker 上,支持负载均衡和容错。视频讲解及图示详见原文。
242 2
|
11月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?
|
11月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
331 2
|
12月前
|
消息中间件 监控 负载均衡
在Kafka中,如何进行主题的分区和复制?
在Kafka中,如何进行主题的分区和复制?

热门文章

最新文章