Kafka 生产者解析

本文涉及的产品
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
简介: Kafka 生产者解析

一、消息发送
1.1 数据生产流程
数据生产流程图解
image.png

Producer创建时,会创建⼀个Sender线程并设置为守护线程
⽣产消息时,内部其实是异步流程;⽣产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)
批次发送的条件为:缓冲区数据⼤⼩达到 batch.size 或者 linger.ms 达到上限,哪个先达到就算哪个
批次发送后,发往指定分区,然后落盘到 broker;如果⽣产者配置了retrires参数⼤于0并且失败原因允许重试,那么客户端内部会对该消息进⾏重试
落盘到broker成功,返回⽣产元数据给⽣产者
元数据返回有两种⽅式:⼀种是通过阻塞直接返回,另⼀种是通过回调返回
1.2 必要的参数配置
先来看看我们一般在程序中是怎么配置的:
image.png

最常用的配置项:
image.png

1.3 拦截器
1.3.1 拦截器介绍
Producer 的拦截器(Interceptor)和 Consumer 的 Interceptor 主要⽤于实现Client端的定制化控制逻辑。
对于Producer⽽⾔,Interceptor使得⽤户在消息发送前以及Producer回调逻辑前有机会对消息做⼀些定制化需求,⽐如修改消息等。同时,Producer允许⽤户指定多个Interceptor按序作⽤于同⼀条消息从⽽形成⼀个拦截链(Interceptor Chain)。Intercetpor 的实现接⼝是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的⽅法包括:

onSend(ProducerRecord):该⽅法封装进KafkaProducer.send⽅法中,即运⾏在⽤户主线程中。Producer确保在消息被序列化以计算分区前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响⽬标分区的计算。
onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运⾏在Producer的IO线程中,因此不要在该⽅法中放⼊很重的逻辑,否则会拖慢Producer的消息发送效率。
close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。
如前所述,Interceptor可能被运⾏在多个线程中,因此在具体实现时⽤户需要⾃⾏确保线程安全。另外倘若指定了多个Interceptor,则Producer将按照指定顺序调⽤它们,并仅仅是捕获每个Interceptor可能抛出的异常记录到错误⽇志中⽽⾮在向上传递。这在使⽤过程中要特别留意。

1.3.2 自定义拦截器
自定义拦截器步骤:

实现ProducerInterceptor接⼝
在KafkaProducer的设置中设置⾃定义的拦截器
自定义拦截器 1:

public class InterceptorOne<Key, Value> implements ProducerInterceptor<Key, Value> {    private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);     @Override    public ProducerRecord<Key, Value> onSend(ProducerRecord<Key, Value> record) {        System.out.println("拦截器1---go");        // 此处根据业务需要对相关的数据作修改        String topic = record.topic();        Integer partition = record.partition();        Long timestamp = record.timestamp();        Key key = record.key();        Value value = record.value();        Headers headers = record.headers();        // 添加消息头        headers.add("interceptor", "interceptorOne".getBytes());        ProducerRecord<Key, Value> newRecord = new ProducerRecord<Key, Value>(topic,                partition, timestamp, key, value, headers);        return newRecord;    }     @Override    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {        System.out.println("拦截器1---back");        if (exception != null) {            // 如果发⽣异常,记录⽇志中            LOGGER.error(exception.getMessage());        }    }     @Override    public void close() {     }     @Override    public void configure(Map<String, ?> configs) {     }}

照着 拦截器 1 再加两个拦截器。

生产者

public class MyProducer1 {    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {        Map<String, Object> configs = new HashMap<>();        // 设置连接Kafka的初始连接⽤到的服务器地址        // 如果是集群,则可以通过此初始连接发现集群中的其他broker        configs.put("bootstrap.servers", "192.168.0.102:9092");        // 设置key的序列化器        configs.put("key.serializer", IntegerSerializer.class);        // 设置⾃定义的序列化类        configs.put("value.serializer", UserSerializer.class);        // 设置自定义分区器        configs.put("partitioner.class", "com.mfc.config.MyPartitioner");        // 设置拦截器        configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,                "com.mfc.interceptor.InterceptorOne,"                        + "com.mfc.interceptor.InterceptorTwo,"                        + "com.mfc.interceptor.InterceptorThree");         KafkaProducer<Integer, User> producer = new KafkaProducer<>(configs);        User user = new User();        user.setUserId(1001);        user.setUsername("阿彪");         // ⽤于封装Producer的消息        ProducerRecord<Integer, User> record = new ProducerRecord<>(                "topic_1", // 主题名称                0, // 分区编号                user.getUserId(), // 数字作为key                user // user 对象作为value        );        producer.send(record, new Callback() {            @Override            public void onCompletion(RecordMetadata metadata, Exception e) {                if (e == null) {                    System.out.println("消息发送成功:" + metadata.topic() + "\t"                            + metadata.partition() + "\t"                            + metadata.offset());                } else {                    System.out.println("消息发送异常");                }            }        });         // 关闭⽣产者        producer.close();    }}

image.png

1.4 序列化器
1.4.1 Kafka 自带序列化器
Kafka使⽤org.apache.kafka.common.serialization.Serializer接⼝⽤于定义序列化器,将泛型指定类型的数据转换为字节数组。

package org.apache.kafka.common.serialization; import java.io.Closeable;import java.util.Map; /**将对象转换为byte数组的接⼝该接⼝的实现类需要提供⽆参构造器@param <T> 从哪个类型转换*/public interface Serializer<T> extends Closeable {    /*    类的配置信息    @param configs key/value pairs    @param isKey key的序列化还是value的序列化    */    void configure(Map<String, ?> var1, boolean var2);     /*    将对象转换为字节数组     @param topic 主题名称     @param data 需要转换的对象     @return 序列化的字节数组    */    byte[] serialize(String var1, T var2);     /*    关闭序列化器    该⽅法需要提供幂等性,因为可能调⽤多次。    */    void close();}

系统提供了该接⼝的⼦接⼝以及实现类:

org.apache.kafka.common.serialization.ByteArraySerializer

org.apache.kafka.common.serialization.ByteBufferSerializer

org.apache.kafka.common.serialization.BytesSerializer

org.apache.kafka.common.serialization.DoubleSerializer

org.apache.kafka.common.serialization.FloatSerializer

org.apache.kafka.common.serialization.IntegerSerializer

org.apache.kafka.common.serialization.StringSerializer

org.apache.kafka.common.serialization.LongSerializer

org.apache.kafka.common.serialization.ShortSerializer

image.png
1.4.2 自定义序列化器
数据的序列化⼀般⽣产中使⽤ avro。

⾃定义序列化器需要实现 org.apache.kafka.common.serialization.Serializer 接⼝,并实现其中的serialize⽅法。

实体类

public class User {    private Integer userId;    private String username;    // set、get方法省略}

自定义序列化器

public class UserSerializer implements Serializer<User> {    @Override    public void configure(Map<String, ?> map, boolean b) {        // do Nothing    }     @Override    public byte[] serialize(String topic, User user) {        try {            // 如果数据是null,则返回null            if (user == null) return null;            Integer userId = user.getUserId();            String username = user.getUsername();            int length = 0;            byte[] bytes = null;            if (null != username) {                bytes = username.getBytes("utf-8");                length = bytes.length;            }            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);            buffer.putInt(userId);            buffer.putInt(length);            buffer.put(bytes);            return buffer.array();        } catch (UnsupportedEncodingException e) {            throw new SerializationException("序列化数据异常");        }    }     @Override    public void close() {        // do Nothing    }}

生产者:

public class MyProducer1 {    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {        Map<String, Object> configs = new HashMap<>();        // 设置连接Kafka的初始连接⽤到的服务器地址        // 如果是集群,则可以通过此初始连接发现集群中的其他broker        configs.put("bootstrap.servers", "192.168.0.102:9092");        // 设置key的序列化器        configs.put("key.serializer", IntegerSerializer.class);        // 设置⾃定义的序列化类        configs.put("value.serializer", UserSerializer.class);         KafkaProducer<Integer, User> producer = new KafkaProducer<>(configs);        User user = new User();        user.setUserId(1001);        user.setUsername("阿彪");         // ⽤于封装Producer的消息        ProducerRecord<Integer, User> record = new ProducerRecord<>(                "topic_1", // 主题名称                0, // 分区编号                user.getUserId(), // 数字作为key                user // user 对象作为value        );        producer.send(record, new Callback() {            @Override            public void onCompletion(RecordMetadata metadata, Exception e) {                if (e == null) {                    System.out.println("消息发送成功:" + metadata.topic() + "\t"                            + metadata.partition() + "\t"                            + metadata.offset());                } else {                    System.out.println("消息发送异常");                }            }        });         // 关闭⽣产者        producer.close();    }}

1.5 分区器
1.5.1 Kafka 自带分区器
默认(DefaultPartitioner)分区计算:

如果record提供了分区号,则使⽤record提供的分区号
如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。
会⾸先在可⽤的分区中分配分区号
如果没有可⽤的分区,则在该主题所有分区中分配分区号。
看一下kafka的生产者(KafkaProducer)源码:

image.png

再看Kafka自带的默认分区器(DefaultPartitioner):

image.png

默认的分区器实现了 Partitioner 接口,先看一下接口:

public interface Partitioner extends Configurable, Closeable {     /**     * 为指定的消息记录计算分区值     *     * @param topic 主题名称     * @param key 根据该key的值进⾏分区计算,如果没有则为null     * @param keyBytes key的序列化字节数组,根据该数组进⾏分区计算。如果没有key,则为null     * @param value 根据value值进⾏分区计算,如果没有,则为null     * @param valueBytes value的序列化字节数组,根据此值进⾏分区计算。如果没有,则为null     * @param cluster 当前集群的元数据     */    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);     /**     * 关闭分区器的时候调⽤该⽅法     */    public void close(); }

1.5.2 自定义分区器
如果要⾃定义分区器,则需要

⾸先开发Partitioner接⼝的实现类
在KafkaProducer中进⾏设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")
实现Partitioner接⼝⾃定义分区器:

public class MyPartitioner implements Partitioner {    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        return 0;    }     @Override    public void close() {     }     @Override    public void configure(Map<String, ?> configs) {     }}

然后在⽣产者中配置:
image.png

二、消息发送原理
原理图解:

image.png

由上图可以看出:KafkaProducer 有两个基本线程:

主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中;
消息收集器RecoderAccumulator为每个分区都维护了⼀个 Deque 类型的双端队列。
ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低⽹络影响;
由于⽣产者客户端使⽤ java.io.ByteBuffer 在发送消息之前进⾏消息保存,并维护了⼀个 BufferPool 实现 ByteBuffer 的复⽤;该缓存池只针对特定⼤⼩( batch.size 指定)的 ByteBuffer进⾏管理,对于消息过⼤的缓存,不能做到重复利⽤。
每次追加⼀条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取⼀个ProducerBatch,判断当前消息的⼤⼩是否可以写⼊该批次中。若可以写⼊则写⼊;若不可以写⼊,则新建⼀个ProducerBatch,判断该消息⼤⼩是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建⽴新的ProducerBatch,这样⽅便进⾏缓存重复利⽤;若超过,则以计算的消息⼤⼩建⽴对应的 ProducerBatch ,缺点就是该内存不能被复⽤了。
Sender线程:
该线程从消息收集器获取缓存的消息,将其处理为 <Node, List 的形式, Node 表示集群的broker节点。
进⼀步将<Node, List转化为<Node, Request>形式,此时才可以向服务端发送数据。
在发送之前,Sender线程将消息以 Map<NodeId, Deque>的形式保存到 InFlightRequests 中进⾏缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压⼒最⼩的⼀个,以实现消息的尽快发出。
三、更多生产者参数配置
1653922409(1).png

1653922434(1).png

image.png

相关文章
|
2月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
99 2
|
3月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
28 1
|
5月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
143 58
|
3月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
55 1
|
4月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
5月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
4月前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
339 0
|
5月前
|
消息中间件 安全 机器人
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
【Azure 事件中心】Kafka 生产者发送消息失败,根据失败消息询问机器人得到的分析步骤
121 0
|
5月前
|
消息中间件 域名解析 网络协议
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
127 1

推荐镜像

更多