Kafka核心之Producer

简介: 本文介绍Kafka的核心之Producer。

一、旧版本producer


0.9.0.0版本以前,是由scala编写的旧版本producer。

入口类:kafka.producer.Producer

代码示例:

Properties properties = new Properties();
        properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.requird.acks", "1");
        ProducerConfig config = new ProducerConfig(properties);
        Producer<String, String> producer = new Producer<String, String>(config);
        KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","hello");
        Producer.send(msg);

旧版本是同步机制,等待响应。吞吐性很差。在0.9.0.0版本以后,正式下架了。

旧版本的方法:

send   发送
close  关闭
sync   异步发送  有丢失消息的可能性


二、新版本producer


旧版本producer由scala编写,0.9.0.0版本以后,新版本producer由java编写。

新版本主要入口类是:org.apache.kafka.clients.producer.KafkaProducer

常用方法:

send  实现消息发送主逻辑
close  关闭producer   metrics  
获取producer的实时监控指标数据 比如发送消息的速率

Kafka producer要比consumer设计简单一些,主要就是向某个topic的某个分区发送一条消息。partitioner决定向哪个分区发送消息。用户指定key,默认的分区器会根据key的哈希值来选择分区,如果没有指定key就以轮询的方式选择分区。也可以自定义分区策略。

确定分区后,producer寻找到分区的leader,也就是该leader所在的broker,然后发送消息,leader会进行副本同步ISR。

producer会启两个线程,主线程封装ProducerRecord类,序列化后发给partitioner,然后发送到内存缓冲区。

另一个I/O线程,提取消息分batch统一发送给对应的broker。

示例代码:

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();


1、构造Properties对象,bootstrap.servers key.serializer value.serializer是必须指定的。

2、使用Properties构造KafkaProducer对象。

3、构造ProducerRecord 指定topic 分区 key value。

4、KafkaProducer的send方法发送。

5、关闭KafkaProducer。


Properties主要参数:

bootstrap.servers 和consumer一样,指定部分broker即可。而且broker端如果没有配ip地址,要写成主机名。

key.serializer value.serializer 序列化参数 一定要全类名 没有key也必须设置。

acks 三个值

0: producer完全不管broker的处理结果 回调也就没有用了 并不能保证消息成功发送 但是这种吞吐量最高

all或者-1: leader broker会等消息写入 并且ISR都写入后 才会响应,这种只要ISR有副本存活就肯定不会丢失,但吞 吐量最低。

1: 默认的值 leader broker自己写入后就响应,不会等待ISR其他的副本写入,只要leader broker存活就不会丢失,即保证了不丢失,也保证了吞吐量。

buffer.memory 缓冲区大小 字节 默认是33554432 就是发送消息的内存缓冲区大小 过小的话会影响吞吐量

compression.type 设置是否压缩消息 默认值是none 压缩后可以降低IO开销提高吞吐,但是会增大CPU开销。

支持三种: GZIP Snappy LZ4 性能 LZ4 > Snappy > GZIP

retries 发送消息重试的次数 默认0 不重试 重试可能造成重复发送 可能造成乱序

retry.backoff.ms 设置重试间隔 默认100毫秒

batch.size 调优重要的参数 batch小 吞吐量也会小 batch大 内存压力会大 默认值是16384 16KB

linger.ms 发送延时 默认是0 0的话不用等batch满就发送 延时的话可以提高吞吐 看具体情况进行调整

max.request.size producer能够发送最大消息的大小 默认1048576字节 如果消息很大 需要修改它

request.timeout.ms 发送请求后broker在规定时间返回 默认30秒 超过就是超时了。


Send方法

fire and forget 就是上边的示例

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();

异步回调 不阻塞

Properties properties = new Properties();
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 600; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i),new Callback(){
              public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null) {
                            e.printStackTrace();
                         } else {
                            System.out.println("The offset of the record we just sent is: " +       metadata.offset());
                         }
                     }           
            });
            System.out.println("testkafka"+i);
        }
        kafkaProducer.close();

同步发送 无限等待返回

producer.send(record).get()

重试机制

如果需要自定义重试机制,就要在回调里对不同异常区别对待,常见的几种如下:

可重试异常

LeaderNotAvailableException :分区的Leader副本不可用,这可能是换届选举导致的瞬时的异常,重试几次就可以恢复

NotControllerException:Controller主要是用来选择分区副本和每一个分区leader的副本信息,主要负责统一管理分区信息等,也可能是选举所致。

NetWorkerException :瞬时网络故障异常所致。

不可重试异常

SerializationException:序列化失败异常

RecordToolLargeException:消息尺寸过大导致。

示例代码:

producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e ==null){
                               //正常处理逻辑
                               System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
                           }else{
                                 if(e instanceof RetriableException) {
                                    //处理可重试异常
                                    ......
                                 } else {
                                    //处理不可重试异常
                                    ......
                                 }
                           }
                       }
                   });

分区机制

partitioner决定向哪个分区发送消息。用户指定key,默认的分区器会根据key的哈希值来选择分区,如果没有指定key就以轮询的方式选择分区。也可以自定义分区策略。

对于有key的消息,java版本的producer自带的partitioner会根据murmur2算法计算消息key的哈希值。然后对总分区数求模得到消息要被发送到的目标分区号。

自定义分区策略:

创建一个类,实现org.apache.kafka.clients.producer.Partitioner接口

主要分区逻辑在Partitioner.partition中实现:通过topic key value 一同确定分区

在构造KafkaProducer得Properties中设置partitioner.class 为自定义类 注意是全类名


序列化机制

常用的serializer

ByteArraySerializer.class

ByteBufferSerializer.class

BytesSerializer.class

DoubleSerializer.class

IntegerSerializer.class

LongSerializer.class

StringSerializer.class

但是其他一些复杂的就需要自定义序列化:

1、定义数据格式

2、创建自定义序列化类,实现org.apache.kafka.common.serialization.Serializer接口

3、在KafkaProducer的Properties中设置key.serializer value.serializer为自定义类

相关文章
|
6月前
|
消息中间件 监控 Java
Kafka Producer异步发送消息技巧大揭秘
Kafka Producer异步发送消息技巧大揭秘
505 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
67 4
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
56 0
|
3月前
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
146 1
|
3月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
88 4
|
3月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
80 2
|
3月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
73 8
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
68 3
|
3月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
101 0
下一篇
无影云桌面