【Kafka】(十二)Kafka 自定义分区器

简介: 【Kafka】(十二)Kafka 自定义分区器

一、默认的分区策略


(1) 如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。


(2) 如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键取 hash 值然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。


二、自定义分区器


为了满足业务需求,你可能需要自定义分区器,例如,通话记录中,给客服打电话的记录要存到一个分区中,其余的记录均分的分布到剩余的分区中。我们就这个案例来进行演示。


(1) 自定义分区器

package com.bonc.rdpe.kafka110.partitioner;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
/**
 * @Content: 自定义分区器
 */
public class PhonenumPartitioner implements Partitioner{
    @Override
    public void configure(Map<String, ?> configs) {
        // TODO nothing
    }
    /**
     * 自定义kafka分区主要解决用户分区数据倾斜问题 提高并发效率(假设 3 分区)
     * @param topic 消息队列名
     * @param key 用户传入key
     * @param keyBytes key字节数组
     * @param value 用户传入value
     * @param valueBytes value字节数据
     * @param cluster 当前kafka节点数
     * @return 如果3个节点数 返回 0 1 2 如果5个 返回 0 1 2 3 4 5
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 得到 topic 的 partitions 信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // 模拟某客服
        if(key.toString().equals("10000") || key.toString().equals("11111")) {
            // 放到最后一个分区中
            return numPartitions - 1;
        }
        String phoneNum = key.toString();
        return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1);
    }
    @Override
    public void close() {
        // TODO nothing
    }
}


(2) 使用自定义分区器

package com.bonc.rdpe.kafka110.producer;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
 * @Title PartitionerProducer.java 
 * @Description 测试自定义分区器
 */
public class PartitionerProducer {
    private static final String[] PHONE_NUMS = new String[]{
        "10000", "10000", "11111", "13700000003", "13700000004",
        "10000", "15500000006", "11111", "15500000008", 
        "17600000009", "10000", "17600000011" 
    };
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        // 设置分区器
        props.put("partitioner.class", "com.bonc.rdpe.kafka110.partitioner.PhonenumPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        int count = 0;
        int length = PHONE_NUMS.length;
        while(count < 10) {
            Random rand = new Random();
            String phoneNum = PHONE_NUMS[rand.nextInt(length)];
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum);
            RecordMetadata metadata = producer.send(record).get();
            String result = "phonenum [" + record.value() + "] has been sent to partition " + metadata.partition();
            System.out.println(result);
            Thread.sleep(500);
            count++;
        }
        producer.close();
    }
}


(3) 测试结果

phonenum [11111] has been sent to partition 2
phonenum [11111] has been sent to partition 2
phonenum [17600000009] has been sent to partition 0
phonenum [17600000011] has been sent to partition 0
phonenum [13700000003] has been sent to partition 1
phonenum [10000] has been sent to partition 2
phonenum [10000] has been sent to partition 2
phonenum [15500000008] has been sent to partition 1
phonenum [10000] has been sent to partition 2
phonenum [17600000009] has been sent to partition 0
目录
相关文章
|
23天前
|
消息中间件 存储 监控
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
深入理解Kafka核心设计及原理(六):Controller选举机制,分区副本leader选举机制,再均衡机制
39 1
|
1月前
|
消息中间件 存储 Kafka
微服务分布问题之Kafka分区的副本和分布如何解决
微服务分布问题之Kafka分区的副本和分布如何解决
|
26天前
|
消息中间件 存储 Kafka
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
面试题Kafka问题之Kafka的消费者(Consumer)跟踪消息如何解决
35 0
|
1月前
|
消息中间件 算法 Kafka
从零开始掌握Kafka Rebalance和分区分配
**Kafka Rebalance详解:**当消费者组成员、订阅主题或分区变化时,集群需重新分配任务。涉及关键点:成员增减、主题数量及分区数变更。Rebalance包括Leader选举、RangeAssignor算法的分区分配,以及创建、删除、修改和查询Topic的基本操作。了解这些有助于优化Kafka集群管理。关注“软件求生”获取更多技术内容!
34 0
|
3月前
|
消息中间件 存储 网络协议
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
Apache Kafka的单分区写入性能在某些严格保序场景中至关重要,但其现有线程模型限制了性能发挥。本文分析了Kafka的串行处理模型,包括SocketServer、KafkaChannel、RequestChannel等组件,指出其通过KafkaChannel状态机确保请求顺序处理,导致处理效率低下。AutoMQ提出流水线处理模型,简化KafkaChannel状态机,实现网络解析、校验定序和持久化的阶段间并行化,提高处理效率。测试结果显示,AutoMQ的极限吞吐是Kafka的2倍,P99延迟降低至11ms。
74 3
Kafka 线程模型痛点攻克: 提升分区写入 2 倍性能
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用合集之支持sink到多分区的kafka ,还能保持有序吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Kafka
【Kafka】分区副本中的 Leader 如果宕机但 ISR 却为空该如何处理
【4月更文挑战第12天】【Kafka】分区副本中的 Leader 如果宕机但 ISR 却为空该如何处理
|
3月前
|
消息中间件 负载均衡 监控
【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
【4月更文挑战第13天】【Kafka】Kafka 创建Topic后如何将分区放置到不同的 Broker 中?
|
3月前
|
消息中间件 运维 监控
【Kafka】分区副本什么情况下会从 ISR 中剔出
【4月更文挑战第12天】【Kafka】分区副本什么情况下会从 ISR 中剔出
|
3月前
|
消息中间件 存储 负载均衡
深度解析Kafka分区策略的精妙之处
深度解析Kafka分区策略的精妙之处
299 1

热门文章

最新文章