初识 Kafka Producer 生产者

简介: 初识 Kafka Producer 生产者

1、KafkaProducer 概述


根据 KafkaProducer 类上的注释上来看 KafkaProducer 具有如下特征:


  • KafkaProducer 是线程安全的,可以被多个线程交叉使用。
  • KafkaProducer 内部包含一个缓存池,存放待发送消息,即 ProducerRecord 队列,与此同时会开启一个IO线程将 ProducerRecord 对象发送到 Kafka 集群。
  • KafkaProducer 的消息发送 API send 方法是异步,只负责将待发送消息 ProducerRecord 发送到缓存区中,立即返回,并返回一个结果凭证 Future。
  • acks
    KafkaProducer 提供了一个核心参数 acks 用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值如下:
  • 0
    表示生产者不关系该条消息在 broker 端的处理结果,只要调用 KafkaProducer 的 send 方法返回后即认为成功,显然这种方式是最不安全的,因为 Broker 端可能压根都没有收到该条消息或存储失败。
  • all 或 -1
    表示消息不仅需要 Leader 节点已存储该消息,并且要求其副本(准确的来说是 ISR 中的节点)全部存储才认为已提交,才向客户端返回提交成功。这是最严格的持久化保障,当然性能也最低。
  • 1
    表示消息只需要写入 Leader 节点后就可以向客户端返回提交成功。


  • retries
    kafka 在生产端提供的另外一个核心属性,用来控制消息在发送失败后的重试次数,设置为 0 表示不重试,重试就有可能造成消息在发送端的重复。
  • batch.size
    kafka 消息发送者为每一个分区维护一个未发送消息积压缓存区,其内存大小由batch.size指定,默认为 16K。
    但如果缓存区中不足100条,但发送线程此时空闲,是需要等到缓存区中积满100条才能发送还是可以立即发送呢?默认是立即发送,即 batch.size 的作用其实是客户端一次发送到broker的最大消息大小。
  • linger.ms
      为了提高 kafka 消息发送的高吞吐量,即控制在缓存区中未积满 batch.size  时来控制 消息发送线程的行为,是立即发送还是等待一定时间,如果linger.ms 设置为 0表示立即发送,如果设置为大于0,则消息发送线程会等待这个值后才会向broker发送。该参数会增加响应时间,但有利于增加吞吐量。有点类似于 TCP 领域的 Nagle 算法。
  • buffer.memory
    用于控制消息发送者缓存的总内存大小,如果超过该值,往缓存区中添加消息会被阻塞,具体会在下文的消息发送流程中详细介绍,阻塞的最大时间可通过参数 max.block.ms 设置,阻塞超过该值会抛出超时异常。
  • key.serializer
    指定 key 的序列化处理器。
  • value.serializer
    指定 消息体的序列化处理器。
  • enable.idempotence
    从 kafka0.11版本开始,支持消息传递幂等,可以做到消息只会被传递一次,通过 enable.idempotence 为 true 来开启。如果该值设置为 true,其 retries 将设置为 Integer.MAX_VALUE,acks 将被设置为 all。为了确保消息发送幂等性,必须避免应用程序端的任何重试,并且如果消息发送API如果返回错误,应用端应该记录最后成功发送的消息,避免消息的重复发送。


从Kafka 0.11开始,kafka 也支持事务消息。


2、KafkaProducer 类图


38a66263683703130af96cc48eb1a33a.jpg

在 Kafka  中,生产者通过接口 Producer 定义,通过该接口的方法,我们基本可以得知 KafkaProducer 将具备如下基本能力:


  • void initTransactions()
    初始化事务,如果需要使用事务方法,该方法必须首先被调用。
  • void beginTransaction()
    开启事务。
  • void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId)
    向消费组提交当前事务中的消息偏移量,将在介绍 Kafka 事务相关文章中详细介绍。
  • void commitTransaction()
    提交事务。
  • void abortTransaction()
    回滚事务。
  • Future< RecordMetadata> send(ProducerRecord
    record)
    消息发送,该方法默认为异步发送,如果要实现同步发送的效果,对返回结果调用  get 方法即可,该方法将在下篇文章中详细介绍。
  • Future< RecordMetadata> send(ProducerRecord
    record, Callback callback)
     消息发送,支持回调。
  • void flush()
    忽略 linger.ms 的值,直接唤醒发送线程,将缓冲区中的消息全部发送到 broker。
  • List< PartitionInfo> partitionsFor(String topic)
    获取 topic 的路由信息(分区信息)。
  • Map< MetricName, ? extends Metric> metrics()
    获取由生产者收集的统计信息。
  • void close()
    关闭发送者。
  • void close(Duration timeout)
    定时关闭消息发送者。


上面的方法我们会根据需要在后续文章中进行详细的介绍。接下来我们看一下 KafkaProducer 的核心属性的含义。


  • String clientId
    客户端ID。在创建 KafkaProducer 时可通过 client.id 定义 clientId,如果未指定,则默认 producer- seq,seq 在进程内递增,强烈建议客户端显示指定 clientId。
  • Metrics metrics
    度量的相关存储容器,例如消息体大小、发送耗时等与监控相关的指标。
  • Partitioner partitioner
    分区负载均衡算法,通过参数 partitioner.class 指定。
  • int maxRequestSize
    调用 send 方法发送的最大请求大小,包括 key、消息体序列化后的消息总大小不能超过该值。通过参数 max.request.size 来设置。
  • long totalMemorySize
    生产者缓存所占内存的总大小,通过参数 buffer.memory 设置。
  • Metadata metadata
    元数据信息,例如 topic 的路由信息,由 KafkaProducer 自动更新。
  • RecordAccumulator accumulator
    消息记录累积器,将在消息发送部分详细介绍。
  • Sender sender
    用于封装消息发送的逻辑,即向 broker 发送消息的处理逻辑。
  • Thread ioThread
    用于消息发送的后台线程,一个独立的线程,内部使用 Sender 来向 broker 发送消息。
  • CompressionType compressionType
    压缩类型,默认不启用压缩,可通过参数 compression.type 配置。可选值:none、gzip、snappy、lz4、zstd。
  • Sensor errors
    错误信息收集器,当成一个 metrics,用来做监控的。
  • Time time
    用于获取系统时间或线程睡眠等。
  • Serializer< K> keySerializer
    用于对消息的 key 进行序列化。
  • Serializer< V> valueSerializer
    对消息体进行序列化。
  • ProducerConfig producerConfig
    生产者的配置信息。
  • long maxBlockTimeMs
    最大阻塞时间,当生产者使用的缓存已经达到规定值后,此时消息发送会阻塞,通过参数 max.block.ms 来设置最多等待多久。
  • ProducerInterceptors
    interceptors
    生产者端的拦截器,在消息发送之前进行一些定制化处理。
  • ApiVersions apiVersions
    维护 api 版本的相关元信息,该类只能在 kafka 内部使用。
  • TransactionManager transactionManager
    kafka 消息事务管理器。
  • TransactionalRequestResult initTransactionsResult
    kafka 生产者事务上下文环境初始结果。

经过上面的梳理,详细读者朋友对 KafkaProducer 消息生产者有了一个大概的认识,下一篇会重点介绍消息发送流程。接下来我们以一个简单的示例结束本文的学习。


3、KafkaProducer 简单示例


package persistent.prestige.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerTest {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        try {
            for (int i = 0; i < 100; i++) {
                Future<RecordMetadata>  future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
                RecordMetadata recordMetadata = future.get();
                System.out.printf("offset:" + recordMetadata.offset());
            }
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}


本文就介绍到这里,其主要的目的是了解Kafka 的 Producer,引出后续需要学习的内容,下一篇将重点讲述 Kafka 消息的发送流程,敬请关注。


相关文章
|
1月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
68 2
|
2月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
71 4
|
2月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
41 1
|
3月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
57 0
|
4月前
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
163 1
|
4月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
96 4
|
4月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
4月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
88 2