连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?

简介: 连Producer端的主线程模块运行原理都不清楚,就敢说自己精通Kafka?

前言

在介绍Producer端原理之前,大家先对其整体架构有一个大致的了解,图示如下所示:

这个图看不懂没有关系,我们会在介绍Producer端原理时一一介绍每个部分的含义及其所复杂的功能。

Main Thread(主线程)

在Main Thread中,一共分为四个步骤,分别是:KafkaProducer(Kafka生产端)、Interceptor(拦截器)、Serializer(序列化器)和Partitioner(分区器);

那么在上个章节中,我们介绍了KafkaProducer端的一些重要参数和使用方式。

本章,就主要针对剩余的3个部分:Interceptor(拦截器)、Serializer(序列化器)和Partitioner(分区器)进行讲解。

1> Interceptor拦截器

Kafka中一共存在两种拦截器,分别是:生产者拦截器ProducerInterceptor)和消费者拦截器ConsumerInterceptor

我们来看一下生产者拦截器的接口定义了哪些方法,如下所示:

public interface ProducerInterceptor<K, V> extends Configurable {
    /** KafkaProducer会在【将消息序列化】和【计算分区】之前调用该方法,来对消息进行相应的定制化操作 */
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    /** KafkaProducer会在【消息被应答之前/消息发送失败】时调用该方法 */
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    /** 关闭拦截器(由于此方法可能被KafkaProducer调用多次,所以必须是幂等的)*/
    void close();
}

在ProducerRecord类中,包含了我们发送消息所需要和信息,这些信息我们都可以在 onSend(ProducerRecord<K, V> record) 方法中进行修改,比如,在发送消息前修改ProducerRecord中的value值,从而改变消息内容。但是,要注意最好不要修改topickeypartition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。如下就是ProducerRecord类中包含的待发送消息的属性列表;

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
    ... ...
}

那么在ProducerRecord类的 onAcknowledgement(RecordMetadata metadata, Exception exception) 方法中,有如下规律:

消息发送成功】metadate不为null,exception为null

消息发送失败】metadate为null,exception不为null

所以,我们可以根据上面的规律来判断有哪些消息发送成功,有哪些消息是发送失败了。对于RecordMetadata类中,包含的发送成功后的“回执”信息,如果想要在源码及注释如下所示:

public final class RecordMetadata {
    public static final int UNKNOWN_PARTITION = -1;
    private final long offset; // 消息的偏移量
    private final long timestamp; // 时间戳
    private final int serializedKeySize; // key的序列化长度
    private final int serializedValueSize; // value的序列化长度
    private final TopicPartition topicPartition; // 主题所在分区
    ... ...
}

2> Serializer序列化器

由于Producer端发送消息给Kafka之后,待传输的消息对象obj是需要被转换成 字节数组byte[] 之后才能在网络中传送,所以,此处必不可少的一个步骤就是序列化器Serializer了。而在Consumer端,需要将接收到的字节数组byte[] 再转换成对象obj,那么这个步骤就是反序列化器Deserializer了。

Kafka在org.apache.kafka.common.serialization目录下提供了多种类型预置的序列化器/反序列化,具体如下所示:

Deserializer、Serializer、ByteArrayDeserializer、ByteArraySerializer

ByteBufferDeserializer、ByteBufferSerializer、BytesDeserializer、BytesSerializer

DoubleDeserializer、DoubleSerializer、FloatDeserializer、FloatSerializer

IntegerDeserializer、IntegerSerializer、ListDeserializer、ListSerializer

LongDeserializer、LongSerializer、ShortDeserializer、ShortSerializer

StringDeserializer、StringSerializer、UUIDDeserializer、UUIDSerializer

VoidDeserializer、VoidSerializer

那么由于本章主要介绍的是Producer端的执行原理,所以我们此时只需关注序列化器Serializer,该接口如下所示:

public interface Serializer<T> extends Closeable {
    /** 配置当前类 */
    default void configure(Map<String, ?> configs, boolean isKey) {
    }
    /** 将对象data转换为字节数组 */
    byte[] serialize(String topic, T data);
    /** 将对象data转换为字节数组 */
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }
    /** 关闭序列化器(由于此方法可能被KafkaProducer调用多次,所以必须是幂等的)*/
    @Override
    default void close() {
    }
}

对于需要实现序列化操作,只需要实现Serialize接口中的方法接口,我们以StringSerializer为例,看一下它是如何实现的,代码如下所示:

public class StringSerializer implements Serializer<String> {
    private String encoding = StandardCharsets.UTF_8.name(); // 默认编码为UTF-8
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        // 首先尝试从configs中获得"key.serializer.encoding"或"value.serializer.encoding"所配置的值
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            // 如果没配置,则尝试从configs中获得"serializer.encoding"所配置的值
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue; // 如果配置了自定义编码,则赋值给encoding;否则为默认的UTF-8
    }
    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null) return null;
            else 
                return data.getBytes(encoding); // 通过调用String的getBytes方法获得字节数组
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException(...);
        }
    }
}

StringSerializer类中,序列化方式非常简单,就是通过调用StringgetBytes方法获得字节数组;除此之外,也可以配置自定义编码。配置方式可以通过向configs中设置key为:"key.serializer.encoding"、"value.serializer.encoding"、"serializer.encoding"这三种,其中serializer.encoding的优先级最低。如果没有配置这3个key,则 默认编码类型就是"UTF-8"

如果Kafka内置的这几种序列化器都不满足需求,则可以自己实现自定义序列化器(例如:MuseSerializer),然后使用时,在properties配置中指定即可:

Properties properties = new Properties();

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, MuseSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MuseSerializer.class.getName());

3> Partitioner分区器

构造ProducerRecord实例对象时,如果在构造方法中指定了partition字段,那么就不需要分区器了;否则,就需要Partitioner分区器来根据key字段计算分区值。ProducerRecord的构造函数如下所示:

当我们没有在ProducerRecord的构造函数中指定partition字段的时候,就需要分区器起作用了,所有的分区器都需要实现接口Partitioner,该接口有如下三个方法:

public interface Partitioner extends Configurable, Closeable {
    /** 计算给定记录的分区 */
    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    /** 关闭分区器(由于此方法可能被KafkaProducer调用多次,所以必须是幂等的)*/
    void close();
    /** 通知分区器即将创建一个新的批处理。当使用sticky分区器时,此方法可以为新批更改选择的sticky分区 */
    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

在Kafka中默认的分区器是DefaultPartitioner。这里有两条逻辑判断分支,即:keyBytes是否为nullkeyBytes就是key的字节数组

keyBytes不为null】对keyBytes进行murmur2哈希计算,然后再基于指定Topic下的所有分片总数进行取余寻址计算。

keyBytes为null】需要调用StickyPartitionCache的partition(...)方法进行计算。

分区逻辑如下所示:

public class DefaultPartitioner implements Partitioner {
    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
    ... ...
    public int partition(String topic, Object key, byte[] keyBytes, 
                         Object value, byte[] valueBytes, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster, 
                         cluster.partitionsForTopic(topic).size()); // 获得Topic下【所有分片】总数
    }
    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
                         byte[] valueBytes, Cluster cluster, int numPartitions) {
        // 如果不存在key的序列化值
        if (keyBytes == null) 
            return stickyPartitionCache.partition(topic, cluster);
        // 对keyBytes进行哈希计算,并在获得Topic下【所有分片】中寻址
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    ... ...
}

如果keyBytes==null,在StickyPartitionCache中如何计算出分区值呢?首先,以主题topic为key,去缓存indexCache中获取分区值part,如果part不为空,则直接返回part,搞定!!

如果part等于null,则说明缓存中没有缓存该topic的分区值,那么就需要计算了,计算步骤如下所示:

步骤1】获得topic下所有分片集合partitions

步骤2】获得topic下所有有效分片集合availablePartitions

步骤3】如果不存在有效分片,则获得一个随机数,基于partitions中取余寻址;

步骤4】如果存在1个有效分片,则获取此分片值;

步骤5】如果存在多个有效分片,则获得一个随机数,基于availablePartitions中取余寻址;

步骤6】将topic分区值维护到缓存indexCache中,并返回分区值;

如下则是partition方法的源码及注释,请见如下所示:

public int partition(String topic, Cluster cluster) {
    Integer part = indexCache.get(topic); // 尝试去缓存中获取,如果获取到,则直接返回
    if (part == null) 
        return nextPartition(topic, cluster, -1); // 获得某主题topic的分区号,并将其维护到缓存indexCache中
    return part;
}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
    // 获得topic下所有分片集合
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    Integer oldPart = indexCache.get(topic); // 尝试去缓存中获取分片号,作为旧分片oldPart
    Integer newPart = oldPart;
    if (oldPart == null || oldPart == prevPartition) {
        // 获得Topic下所有【有效分片】集合
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        // 如果不存在有效分片,则获得一个随机数,基于partitions中取余寻址
        if (availablePartitions.size() < 1) {
            Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
            newPart = random % partitions.size();
        } 
        // 如果存在1个有效分片,则分配到此处
        else if (availablePartitions.size() == 1) {
            newPart = availablePartitions.get(0).partition();
        } 
        // 如果存在多个有效分片,则获得一个随机数,基于availablePartitions中取余寻址
        else {
            while (newPart == null || newPart.equals(oldPart)) {
                int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = availablePartitions.get(random % availablePartitions.size()).partition();
            }
        }
        // 维护到缓存indexCache中,主题Topic为key
        if (oldPart == null) indexCache.putIfAbsent(topic, newPart);
        else indexCache.replace(topic, prevPartition, newPart);
        return indexCache.get(topic); // 获得主题Topic的分区号
    }
    return indexCache.get(topic); // 获得主题Topic的分区号
}

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

相关文章
|
21天前
|
存储 缓存 Java
什么是线程池?从底层源码入手,深度解析线程池的工作原理
本文从底层源码入手,深度解析ThreadPoolExecutor底层源码,包括其核心字段、内部类和重要方法,另外对Executors工具类下的四种自带线程池源码进行解释。 阅读本文后,可以对线程池的工作原理、七大参数、生命周期、拒绝策略等内容拥有更深入的认识。
什么是线程池?从底层源码入手,深度解析线程池的工作原理
|
1月前
|
Java Spring
运行@Async注解的方法的线程池
自定义@Async注解线程池
56 3
|
11天前
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
2月前
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
68 1
|
2月前
|
存储 NoSQL Java
线程池的原理与C语言实现
【8月更文挑战第22天】线程池是一种多线程处理框架,通过复用预创建的线程来高效地处理大量短暂或临时任务,提升程序性能。它主要包括三部分:线程管理器、工作队列和线程。线程管理器负责创建与管理线程;工作队列存储待处理任务;线程则执行任务。当提交新任务时,线程管理器将其加入队列,并由空闲线程处理。使用线程池能减少线程创建与销毁的开销,提高响应速度,并能有效控制并发线程数量,避免资源竞争。这里还提供了一个简单的 C 语言实现示例。
|
2月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
51 4
|
2月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
38 2
|
1月前
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
18 0
|
2月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
44 0
下一篇
无影云桌面