探究Kafka原理-3.生产者消费者API原理解析(上)

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 探究Kafka原理-3.生产者消费者API原理解析

API 开发:producer 生产者


生产者 api 示例


一个正常的生产逻辑需要具备以下几个步骤


(1)配置生产者参数及创建相应的生产者实例

(2)构建待发送的消息

(3)发送消息

(4)关闭生产者实例


首先,引入 maven 依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.3.1</version>
</dependency>

采用默认分区方式将消息散列的发送到各个分区当中

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/*
kafka生产者api代码示例
*/
public class MyProducer {
  public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    //设置 kafka 集群的地址 必选
        props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092");
        //ack 模式,取值有 0,1,-1(all) , all 是最慢但最安全的 消息发送,应答级别
        props.put("acks", "all");
        //序列化器 因为业务数据有各种类型的,但是kafka底层存储里面不可能有各种类型的,只能是序列化的字节,所以不管你要发什么东西给它,都要提供一个序列化器,帮你能够把key value序列化成二进制的字节
        // 因为kafka底层的存储是没有类型维护机制的,用户所发的所有数据类型,都必须 序列化成byte[],所以kafka的producer需要一个针对用户所发送的数据类型的序列化工具类,且这个序列化工具类,需要实现kafka所提供的序列工具接口。
        props.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
        /*
        需要额外的指定泛型,key value
        */
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            // 其调用是异步的,数据的发送动作在producer的底层是异步线程的
          producer.send(new ProducerRecord<String, String>("test",
        Integer.toString(i), "dd:"+i));
        // 在这里面可以通过逻辑判断去指定发送到那个topic中
        //Thread.sleep(100);
        producer.close();
  }
}

消息对象 ProducerRecord,除了包含业务数据外,还包含了多个属性:

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

其发送方法中,根据参数的不同,有不同的构造方法

其实这样也就意味着我们可以把消息发送到不同的topic。


必要的参数配置


Kafka 生产者客户端 KakaProducer 中有 3 个参数是必填的。

bootstrap.servers / key.serializer / value.serializer

为了防止参数名字符串书写错误,可以使用如下方式进行设置:

pro.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
pro.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


发送消息


创建生产者实例和构建消息之后 就可以开始发送消息了。发送消息主要有 3 种模式:


发后即忘( fire-and-forget)


发后即忘,它只管往 Kafka 发送,并不关心消息是否正确到达。


在大多数情况下,这种发送方式没有问题;


不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。


这种发送方式的性能最高,可靠性最差。

Future<RecordMetadata> send = producer.send(rcd);


同步发送(sync )


try {
  producer.send(rcd).get();
} catch (Exception e) {
  e.printStackTrace();
}

因为Future的get方法是同步阻塞的。


异步发送(async )


回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。


注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyProducer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
    // Kafka 服务端的主机名和端口号
        props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092");
    // 等待所有副本节点的应答
        props.put("acks", "all");
    // 消息发送最大尝试次数
        props.put("retries", 0);
    // 一批消息处理大小
        props.put("batch.size", 16384);
    // 增加服务端请求延时
        props.put("linger.ms", 1);
    // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
    // key 序列化
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
    // value 序列化
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        for (int i = 0; i < 50; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("test", "hello" + i),
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception exception) {
                            if (metadata != null) {
                                System.out.println(metadata.partition()+ "-"+ metadata.offset());
                            }
                        }
                    });
        }
        kafkaProducer.close();
    }
}


API 开发:consumer 消费


import org.apache.kafka.clients.consumer.*;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
    // 定义 kakfa 服务的地址,不需要将所有 broker 指定上
        // 客户端只要知道一台服务器,就能通过这一台服务器来获知整个集群的信息(所有的服务器、主机名等)
        // 如果你只填写一台,万一,你得客户端启动的时候,宕机了不在线,那就无法连接到集群了
        // 如果你填写了堕胎,有一个好处就是,万一连不上其中一个,可以去连接其它的
        props.put("bootstrap.servers", "doitedu01:9092");
    // 制定 consumer group
        props.put("group.id", "g1");
        // 按照一个时间间隔自动去提交偏移量
    // 是否自动提交 offset
        props.put("enable.auto.commit", "true");
    // 自动提交 offset 的时间间隔
        props.put("auto.commit.interval.ms", "1000");
    // key 的反序列化类
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
    // value 的反序列化类
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        // kafka的消费者,默认是从属组之前所记录的偏移量开始消费,如果找不到之前记录的偏移量,则从如下参数配置的策略确定消费起始偏移量
    // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
        /*
        earliest 自动重置到每个分区的最前一条消息
        latest   自动重置到每个分区的最新一条消息
        none   没有重置策略
        */
        props.put("auto.offset.reset","earliest");
    // 定义 consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // 消费者订阅的 topic, 可同时订阅多个
        // subscribe订阅,是需要参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区
        // 只要消费者组里的消费者 变化了 就要发生再均衡
        consumer.subscribe(Arrays.asList("first", "test","test1"));
        // 显式指定消费起始偏移量(如果同时设置了消费者 偏移策略的话,以手动指定的为准)
        // 在设置消费分区起始偏移量这里,存在一个点,如果此时到这里了然后消费者组再均衡机制还没有做完,那么就会报错,因为可能这个消费者还没有被分配到这个分区  针对这个问题,其实动态再分配是有一个过程 和 时间的,谁也不知道要等多久,所以最好想的sleep就不容易实现了。
        想要解决这个问题有两种办法
            1.在这个过程中 拉一次数据,能拉到就代表再均衡机制完成了 consumer.poll(Long.MAX_VALVE);这里是无意义的拉一次数据,主要是为了确保分区分配已完成,然后就能够去定位偏移量了。但是这种方式不符合最初的设计初衷,如果是使用subscribe来订阅主题,那就意味着是应该参与这个组的均衡的,参与了,那就不要去指定组的偏移量了,应该听从组的分配。
            2.既然要自己指定一个确定的起始消费位置,那通常隐含之意就是不需要去参与消费者组的自动再均衡机制那么就不要使用subscribe来订阅主题
            consumer.assign(Arrays.asList(new TopicPartition("ddd",0))) 使用这个是不参与消费者的自动再均衡的。
        //TopicPartition first0 = new TopicPartition("first",0);
        //TopicPartition first1 = new TopicPartition("first",1);
        //consumer.seek(first0,10);
        //consumer.seek(first1,15);
        /*
        kafka消费者的起始消费位置有两种决定机制
        1.手动指定了起始位置,它肯定从你指定的位置开始
        2.如果没有手动指定位置,它会在找消费组之前所记录的偏移量开始
        3.如果之前的位置也获取不到,就看参数 : auto.offset.reset 所指定的重置策略
        */
        while (true) {
    // 读取数据,读取超时时间为 100ms
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                // ConsumerRecord中,不光有用户的业务数据,还有kafka塞入的元数据
                String key = record.key();
              String value = record.value();
                // 本条数据所属的topic
              String topic = record.topic();
              // 本条数据所属的分区
              int partition = record.partition
                // 本条数据的offset
                long offset = record.offset();
                // 当前这条数据所在分区的leader的朝代纪年
              Optional<Integer> leaderEpoch = record.leaderEpoch();
              // 在kafka的数据底层存储中,不光有用户的业务数据,还有大量元数据,timestamp就是其中之一:记录本条数据的时间戳,但是时间戳有两种类型,本条数据的创建时间(生产者)、本条数据的追加时间(broker写入log文件的时间)
                TimestampType timestampType = record.timestampType();
              long timestamp = record.timestamp();
                // 数据头,是生产者在写入数据时附加进去的(相当于用户自己的元数据)
              // 在生产者发送数据的时候,有一个构造方法可以允许你自己携带自己的 headers
              Headers headers = record.headers();
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
                        record.key(), record.value());
        }
    }
}

如果消息还没生产到指定的位置呢?这是一个很有趣的问题,到底是等,还是报错

kafka-console-consumer.sh --bootstrap-server doit01:9092 --topic test --offset 100000 --partition 0  

假设分区0 中并没有offset >= 100000 的消息,执行之后,并不会报错,但是如果超标了,就会自动重置到最新的(lastest)。


如果如果指定的offset大于最大可用的offset,那么就会定义到最后一条消息。


subscribe 订阅主题


subscribe 有如下重载方法:

public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)

通过这几个构造函数来看,其中有ConsumerRebalanceListener listener 其实就是 再均衡 的监听器,再均衡的过程中,会调用这个方法。

Properties props = new Properties();
// 从配置文件中加载写好的参数
props.load(Consumer.class.getClassLoader.getResourceAsStream("consumer.properties"));
// 手动set一些参数进去
props.setProperty();
......
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
// reb-1 主题 3个分区
// reb-2 主题 2个分区
consumer.subscribe(Arrays.asList("reb-1","reb-2"),new ConsumerRebalanceListener(){
    // 再均衡分配过程中,消费者会取消先前所分配的主题、分区
    // 取消了之后,consumer会调用下面的方法
    public void onPartitionsRevoked(Collection<TopicPartition> partitions){
    }
    // 再均衡过程中,消费者会重新分配到新的主题、分区
    // 分配了新的主题 和 分区之后,consumer底层会调用下面的方法
    public void onPartitionAssigned(Collection<TopicPartition> partitions){
    }
});
但是以上的过程 懒加载,只有消费者真正 开始 poll的时候,才会实现再均衡分配的过程。

现有的再均衡原则就是每次有消费者增减 都会重新分配,其实就是先全部取消,然后又重新分配了呢,这过程中肯定存在消耗,得先把工作暂停,把偏移量记好,另外一个人接手的时候,还需要另外去读偏移量,重新从对应的位置开始。


而在kafka2.4.1中解决了这个重分配的问题。但是大多数使用的框架没有到这个版本,或者所使用的如spark flink等底层所依赖的kafka没有2.4.1这个版本。


消费者组再均衡分区分配策略


消费者组的意义何在?为了提高数据处理的并行度!

会触发 rebalance 的事件可能是如下任意一种:


  • 有新的消费者加入消费组。
  • 有消费者宕机下线,消费者并不一定需要真正下线,例如遇到长时间的 GC 、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者己下线。
  • 有消费者主动退出消费组(发送 LeaveGroupRequest 请求):比如客户端调用了 unsubscrible()方法取消对某些主题的订阅。
  • 消费组所对应的 GroupCoorinator 节点发生了变更。
  • 消费组内所订阅的任一主题或者主题的分区数量发生变化。


将分区的消费权从一个消费者移到另一个消费者称为再均衡(rebalance),如何 rebalance 也涉及到分区分配策略。


kafka 有两种的分区分配策略:range(默认) 和 round robin(新版本中又新增了另外 2 种)


我们可以通过 partition.assignment.strategy 参数选择 range 或 roundrobin。


partition.assignment.strategy 参数默认的值是 range。


partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor


partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor


这个参数属于“消费者”参数!


Range Strategy


  • 先将消费者按照 client.id 字典排序,然后按 topic 逐个处理;
  • 针对一个 topic,将其 partition 总数/消费者数得到 商 n 和 余数 m,则每个 consumer 至少分到 n个分区,且前 m 个 consumer 每人多分一个分区;


举例说明 2:假设有 TOPIC_A 有 5 个分区,由 3 个 consumer(C1,C2,C3)来消费;


5/3 得到商 1,余 2,则每个消费者至少分 1 个分区,前两个消费者各多 1 个分区 C1: 2 个分区,C2:2 个分区, C3:1 个分区


接下来,就按照“区间”进行分配:

C1: TOPIC_A-0 TOPIC_A-1
C2: TOPIC_A-2 TOPIC_A_3
C3: TOPIC_A-4

举例说明 2:假设 TOPIC_A 有 5 个分区,TOPIC_B 有 3 个分区,由 2 个 consumer(C1,C2)来消费

先分配 TOPIC_A:


5/2 得到商 2,余 1,则 C1 有 3 个分区,C2 有 2 个分区,得到结果

C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2
C2: TOPIC_A-3 TOPIC_A-4

再分配 TOPIC_B:


3/2 得到商 1,余 1,则 C1 有 2 个分区,C2 有 1 个分区,得到结果

C1: TOPIC_B-0 TOPIC_B-1
C2: TOPIC_B-2

最终分配结果:

C1: TOPIC_A-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-0 TOPIC_B-1
C2: TOPIC_A-3 TOPIC_A-4 TOPIC_B-2

如果共同订阅的主题很多,那也就意味着,排在前面的消费者拿到的分区会明显多余排在后面的。


而消费者本身有一个id,是根据id号去排序


以上就是该种模式的弊端,其实就是一个topic一个topic去分的。这个问题尤其是在订阅多个topic的时候最明显,分配单个topic的情况,也就多一个分区。


Round-Robin Strateg


将所有主题分区组成 TopicAndPartition 列表,并对 TopicAndPartition 列表按照其 hashCode 排序,然后,以轮询的方式分配给各消费者。


以上述问题来举例:


先对 TopicPartition 的 hashCode 排序,假如排序结果如下:


TOPIC_A-0 TOPIC_B-0 TOPIC_A-1 TOPIC_A-2 TOPIC_B-1 TOPIC_A-3 TOPIC_A-4 TOPIC_B-


然后按轮询方式分配


C1: TOPIC_A-0 TOPIC_A-1 TOPIC_B-1 TOPIC_A-4


C2: TOPIC_B-0 TOPIC_A-2 TOPIC_A-3 TOPIC_B-2


Sticky Strategy


对应的类叫做: org.apache.kafka.clients.consumer.StickyAssignor


sticky 策略的特点:


  • 要去打成最大化的均衡
  • 尽可能保留各消费者原来分配的分区


再均衡的过程中,还是会让各消费者先取消自身的分区,然后再重新分配(只不过是分配过程中会尽量让原来属于谁的分区依然分配给谁)


以一个例子来看

---开始
C1:A-P0   B-P1  B-P2
C2:B-P0   A-P1
---加入C3后再分配
Range Strategy
C1:A-P0   A-P1
C2:B-P0   B-P2
C3:B-P1
Sticky Strategy
C1:A-P0   B-P1
C2:B-P0   A-P1
C3:B-P2
--


探究Kafka原理-3.生产者消费者API原理解析(下):https://developer.aliyun.com/article/1413719

目录
相关文章
|
20天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
51 2
|
24天前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
16 1
|
1月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
61 5
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
34 1
|
2月前
|
消息中间件 安全 Kafka
Kafka支持SSL/TLS协议技术深度解析
SSL(Secure Socket Layer,安全套接层)及其继任者TLS(Transport Layer Security,传输层安全)是为网络通信提供安全及数据完整性的一种安全协议。这些协议在传输层对网络连接进行加密,确保数据在传输过程中不被窃取或篡改。
172 0
|
3月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
79 0
|
3月前
|
消息中间件 域名解析 网络协议
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
|
6天前
|
JSON API 数据格式
淘宝 / 天猫官方商品 / 订单订单 API 接口丨商品上传接口对接步骤
要对接淘宝/天猫官方商品或订单API,需先注册淘宝开放平台账号,创建应用获取App Key和App Secret。之后,详细阅读API文档,了解接口功能及权限要求,编写认证、构建请求、发送请求和处理响应的代码。最后,在沙箱环境中测试与调试,确保API调用的正确性和稳定性。
|
18天前
|
供应链 数据挖掘 API
电商API接口介绍——sku接口概述
商品SKU(Stock Keeping Unit)接口是电商API接口中的一种,专门用于获取商品的SKU信息。SKU是库存量单位,用于区分同一商品的不同规格、颜色、尺寸等属性。通过商品SKU接口,开发者可以获取商品的SKU列表、SKU属性、库存数量等详细信息。
|
19天前
|
JSON API 数据格式
店铺所有商品列表接口json数据格式示例(API接口)
当然,以下是一个示例的JSON数据格式,用于表示一个店铺所有商品列表的API接口响应

推荐镜像

更多