Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】

1.生产者发送消息的过程及生产者设计

1.1 消息发送过程

生产者发送消息的过程描述:

  • Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。为了使其能够在网络上传输,在发送 ProducerRecord 对象前生产者会把键和值对象序列化成字节数组。
  • 接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 Broker 上。
  • 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。

在这里插入图片描述

1.2 生产者设计

  • 负载均衡(Partition 会均衡分布到不同 Broker 上)

由于消息Topic 由多个 Partition 组成,且 Partition 会均衡分布到不同 Broker 上,因此,为了有效利用 Broker 集群的性能,提高消息的吞吐量,Producer 可以通过随机或者 Hash 等方式,将消息平均发送到多个 Partition 上,以实现负载均衡。

  • 批量发送

是提高消息吞吐量重要的方式,Producer 端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给 Broker,从而大大减少 broker 存储消息的 IO 操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。

  • 压缩( GZIP 或 Snappy )

Producer 端可以通过 GZIP 或 Snappy 格式对消息集合进行压缩。Producer 端进行压缩之后,在Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是 CPU(压缩和解压会耗掉部分 CPU 资源)。

2.创建生产者

2.1 项目依赖

本项目采用 Maven 构建,想要调用 Kafka 生产者 API,需要导入 kafka-clients 依赖,如下:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.2.0</version>
</dependency>

2.2 创建生产者

创建 Kafka 生产者时,以下三个属性是必须指定的:

  • bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
  • key.serializer :指定键的序列化器;
  • value.serializer :指定值的序列化器。

创建的示例代码如下:

public class SimpleProducer {
   
   
    public static void main(String[] args) {
   
   
        String topicName = "Hello-Kafka";
        Properties props = new Properties();
        props.put("bootstrap.servers", "tcloud:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*创建生产者*/
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
   
   
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "Hello World " + i);
            /* 发送消息*/
            producer.send(record);
        }
        /*关闭生产者*/
        producer.close();
    }
}

2.3 测试

  1. 启动Kakfa

Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

# zookeeper启动命令
bin/zkServer.sh start
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

# bin/kafka-server-start.sh config/server.properties
  1. 创建topic
# 创建用于测试主题
bin/kafka-topics.sh --create \
          --bootstrap-server tcloud:9092 \
          --replication-factor 1 \
          --partitions 1 \
          --topic Hello-Kafka
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server tcloud:9092
  1. 启动消费者

启动一个控制台消费者用于观察写入情况,启动命令如下:

# kafka-console-consumer.sh --bootstrap-server tcloud:9092 --topic Hello-Kafka --from-beginning
  1. 运行项目

此时可以看到消费者控制台,输出如下,这里 kafka-console-consumer 只会打印出值信息,不会打印出键信息。

在这里插入图片描述

2.4 可能出现的问题

在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这通常出现在你使用默认配置启动 Kafka 的情况下,此时需要对 server.properties 文件中的 listeners 配置进行更改:

# tcloud 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
listeners=PLAINTEXT://tcloud:9092

3.发送消息

上面的示例程序调用了 send 方法发送消息后没有做任何操作,在这种情况下,我们没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。

3.1 同步发送

在调用 send 方法后可以接着调用 get() 方法, send 方法的返回值是一个Future对象,RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码如下:

public class SimpleProducerSyn {
   
   
    public static void main(String[] args) throws ExecutionException, InterruptedException {
   
   
        String topicName = "Hello-Kafka";
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop001:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*创建生产者*/
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
   
   
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "Hello Kafka " + i);
            /* 同步发送消息*/
            RecordMetadata metadata = producer.send(record).get();
            System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset());
        }
        /*关闭生产者*/
        producer.close();
    }
}

此时得到的输出如下:偏移量和调用次数有关,所有记录都分配到了 0 分区,这是因为在创建 Hello-Kafka 主题时候,使用 --partitions 指定其分区数为 1,即只有一个分区。

topic=Hello-Kafka, partition=0, offset=10 
topic=Hello-Kafka, partition=0, offset=11 
topic=Hello-Kafka, partition=0, offset=12 
topic=Hello-Kafka, partition=0, offset=13 
topic=Hello-Kafka, partition=0, offset=14 
topic=Hello-Kafka, partition=0, offset=15 
topic=Hello-Kafka, partition=0, offset=16 
topic=Hello-Kafka, partition=0, offset=17 
topic=Hello-Kafka, partition=0, offset=18 
topic=Hello-Kafka, partition=0, offset=19

3.2 异步发送

通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:

public class SimpleProducerAsyn {
   
   
    public static void main(String[] args) {
   
   
        String topicName = "Hello-Kafka";
        Properties props = new Properties();
        props.put("bootstrap.servers", "tcloud:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*创建生产者*/
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
   
   
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "Hello Kafka " + i);
            /* 异步发送消息,并监听回调*/
            producer.send(record, new Callback() {
   
   
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
   
   
                    if (exception != null) {
   
   
                        System.out.println("进行异常处理");
                    } else {
   
   
                        System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset());
                    }
                }
            });
        }
        /*关闭生产者*/
        producer.close();
    }
}

4.自定义分区器

Kafka 有着默认的分区机制:

  • 如果键值为 null, 则使用轮询 (Round Robin) 算法将消息均衡地分布到各个分区上;
  • 如果键值不为 null,那么 Kafka 会使用内置的散列算法对键进行散列,然后分布到各个分区上。

某些情况下,你可能有着自己的分区需求,这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例:

4.1 自定义分区器

public class CustomPartitioner implements Partitioner {
   
   
    private int passLine;

    @Override
    public void configure(Map<String, ?> configs) {
   
   
        /*从生产者配置中获取分数线*/
        passLine = (Integer) configs.get("pass.line");
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
   
   
        /*key 值为分数,当分数大于分数线时候,分配到 1 分区,否则分配到 0 分区*/
        return (Integer) key >= passLine ? 1 : 0;
    }

    @Override
    public void close() {
   
   
        System.out.println("分区器关闭");
    }
}

需要在创建生产者时指定分区器,和分区器所需要的配置参数:

public class ProducerWithPartitioner {
   
   
    public static void main(String[] args) {
   
   
        String topicName = "Kafka-Partitioner-Test";
        Properties props = new Properties();
        props.put("bootstrap.servers", "tcloud:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*传递自定义分区器*/
        props.put("partitioner.class", "com.yz.kafka.producers.CustomPartitioner");
        /*传递分区器所需的参数*/
        props.put("pass.line", 6);
        Producer<Integer, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i <= 10; i++) {
   
   
            String score = "score:" + i;
            ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
            /*异步发送消息*/
            producer.send(record, (metadata, exception) -> System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
        }
        producer.close();
    }
}

4.2 测试

需要创建一个至少有两个分区的主题【当前创建的是 :two: 个分区】:

bin/kafka-topics.sh --create \
          --bootstrap-server tcloud:9092 \
          --replication-factor 1 --partitions 2 \
          --topic Kafka-Partitioner-Test

此时输入如下,可以看到分数大于等于 6 分的都被分到 1 分区,而小于 6 分的都被分到了 0 分区。

score:6, partition=1, 
score:7, partition=1, 
score:8, partition=1, 
score:9, partition=1, 
score:10, partition=1, 
score:0, partition=0, 
score:1, partition=0, 
score:2, partition=0, 
score:3, partition=0, 
score:4, partition=0, 
score:5, partition=0,

5.自定义序列化器

为什么要自定义序列化器?我们可用先看一下 Kafka 自带的序列化器:

在这里插入图片描述
可以发现都是简单类型的,如果我们要存储的对象是自己封装的那就没有相应的序列化器了,需要自己进行定义。

5.1 自定义对象及自定义序列化器

// 自定义对象
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Student {
   
   
    String name;
    String address;
}

// 自定义序列化器()
public class CustomStudentSerializer implements Serializer<Student> {
   
   

    @Override
    public void configure(Map<String, ?> map, boolean b) {
   
   
    }

    @Override
    public byte[] serialize(String s, Student student) {
   
   
        if (student == null) {
   
   
            return null;
        }
        byte[] name, address;
        if (student.getName() != null) {
   
   
            name = student.getName().getBytes(StandardCharsets.UTF_8);
        } else {
   
   
            name = new byte[0];
        }
        if (student.getAddress() != null) {
   
   
            address = student.getAddress().getBytes(StandardCharsets.UTF_8);
        } else {
   
   
            address = new byte[0];
        }
        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
        buffer.putInt(name.length);
        buffer.put(name);
        buffer.putInt(address.length);
        buffer.put(address);
        return buffer.array();
    }

    @Override
    public void close() {
   
   
    }
}

5.2 自定义序列化器使用

public class ProducerWithSerializer {
   
   
    public static void main(String[] args) {
   
   
        String topicName = "Kafka-Serializer-Test";
        Properties properties = new Properties();
        // 设置key序列号器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置value序列号器(自定义对象使用自定义的序列化器)
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomStudentSerializer.class.getName());
        // 设置集群地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "tcloud:9092");
        // 设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 2);

        KafkaProducer<String, Student> producer = new KafkaProducer<>(properties);
        Student student = Student.builder().name("卡夫卡").address("郑州").build();
        ProducerRecord<String, Student> record = new ProducerRecord<>(topicName, "Student NO.1 ", student);
        try {
   
   
            /*同步发送消息*/
            RecordMetadata metadata = producer.send(record).get();
            System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset());
        } catch (Exception e) {
   
   
            e.printStackTrace();
        }
        producer.close();
    }
}

5.3 测试

创建主题:

bin/kafka-topics.sh --create \
          --bootstrap-server tcloud:9092 \
          --replication-factor 1 \
          --topic Kafka-Serializer-Test

执行生产者程序 ProducerWithSerializer 的 main 函数,打印结果:

topic=Kafka-Serializer-Test, partition=0, offset=0

开启消费者查看一下数据:

kafka-console-consumer.sh --bootstrap-server tcloud:9092 \
--topic Kafka-Serializer-Test \
--from-beginning \
--property print.key=true

在这里插入图片描述
我们不使用自定义的序列化器,而是自带的 StringSerializer 测试一下,结果是报 ClassCastException 异常的:

org.apache.kafka.common.errors.SerializationException: 
Can't convert value of class com.yz.kafka.producers.Student to class 
org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: 
com.yz.kafka.producers.Student cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:878)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:840)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:727)
    at com.yz.kafka.producers.ProducerWithSerializer.main(ProducerWithSerializer.java:32)

6.生产者其他属性

上面生产者的创建都仅指定了服务地址,键序列化器、值序列化器,实际上 Kafka 的生产者还有很多可配置属性,如下:

  1. acks
    acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:
    acks=0 : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
    acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
    acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
  2. buffer.memory
    设置生产者内存缓冲区的大小。
  3. compression.type
    默认情况下,发送的消息不会被压缩。如果想要进行压缩,可以配置此参数,可选值有 snappy,gzip,lz4。
  4. retries
    发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。
  5. batch.size
    当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
  6. linger.ms
    该参数制定了生产者在发送批次之前等待更多消息加入批次的时间。
  7. clent.id
    客户端 id,服务器用来识别消息的来源。
  8. max.in.flight.requests.per.connection
    指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量,把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试。
  9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms
    timeout.ms 指定了 borker 等待同步副本返回消息的确认时间;
    request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;
    metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。
  10. max.block.ms
    指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法会阻塞。在阻塞时间达到 max.block.ms时,生产者会抛出超时异常。
  11. max.request.size
    该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1000K ,那么可以发送的单个最大消息为 1000K ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1K。
  12. receive.buffer.bytes & send.buffer.byte
    这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。
目录
相关文章
|
21天前
|
消息中间件 存储 负载均衡
【Kafka】Kafka 分区
【4月更文挑战第5天】【Kafka】Kafka 分区
|
14天前
|
消息中间件 监控 Kafka
【Kafka】分区副本中的 Leader 如果宕机但 ISR 却为空该如何处理
【4月更文挑战第12天】【Kafka】分区副本中的 Leader 如果宕机但 ISR 却为空该如何处理
|
19天前
|
消息中间件 存储 负载均衡
【Kafka】Kafka 的分区分配策略分析
【4月更文挑战第7天】【Kafka】Kafka 的分区分配策略分析
|
19天前
|
消息中间件 监控 Kafka
【Kafka】Kafka 分区Leader选举策略
【4月更文挑战第7天】【Kafka】Kafka 分区Leader选举策略
|
29天前
|
消息中间件 存储 Kafka
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿一下RocketMQ和Kafka索引设计原理和方案
48 1
|
1月前
|
消息中间件 Kafka Linux
Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
【2月更文挑战第21天】Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
190 2
|
1月前
|
消息中间件 网络协议 Kafka
Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
【2月更文挑战第21天】Kafka【付诸实践 02】消费者和消费者群组+创建消费者实例+提交偏移量(自动、手动)+监听分区再平衡+独立的消费者+消费者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka消费者】
81 3
|
2月前
|
消息中间件 安全 Kafka
2024年了,如何更好的搭建Kafka集群?
我们基于Kraft模式和Docker Compose同时采用最新版Kafka v3.6.1来搭建集群。
438 2
2024年了,如何更好的搭建Kafka集群?
|
3月前
|
消息中间件 存储 数据可视化
kafka高可用集群搭建
kafka高可用集群搭建
44 0
|
6月前
|
消息中间件 存储 Kubernetes
Helm方式部署 zookeeper+kafka 集群 ——2023.05
Helm方式部署 zookeeper+kafka 集群 ——2023.05
243 0