探究Kafka原理-3.生产者消费者API原理解析(下)

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 探究Kafka原理-3.生产者消费者API原理解析

探究Kafka原理-3.生产者消费者API原理解析(上):https://developer.aliyun.com/article/1413716


Cooperative Sticky Strategy


对应的类叫做: org.apache.kafka.clients.consumer.ConsumerPartitionAssignor(最新的一种 2.4.1)

sticky 策略的特点:


  • 逻辑与 sticky 策略一致
  • 支持 cooperative 再均衡机制(再均衡的过程中,不会让所有消费者取消掉所有分区然后再进行重分配,影响到谁,就针对那个消费者进行即可)


消费者组再均衡流程


消费组在消费数据的时候,有两个角色进行组内的各事务的协调;


  • 角色 1: Group Coordinator (组协调器) 位于服务端(就是某个 broker)
  • 角色 2: Group Leader (组长) 位于消费端(就是消费组中的某个消费者)


GroupCoordinator 介绍


每个消费组在服务端对应一个 GroupCoordinator 其进行管理,GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。


消费者客户端中由 ConsumerCoordinator 组件负责与 GroupCoordinator 行交互;


ConsumerCoordinator 和 GroupCoordinator 最重要的职责就是负责执行消费者 rebalance 操作


eager 协议再均衡步骤细节


定位 Group Coordinator


coordinator 在我们组记偏移量的__consumer_offsets 分区的 leader 所在 broker 上


查找 Group Coordinator 的方式:


先根据消费组 groupid 的 hashcode 值计算它应该所在_consumer_offsets 中的分区编号:

Utils.abc(groupId.hashCode) % groupMetadataTopicPartitionCount
groupMetadataTopicPartitionCount 为 __consumer_offsets 的 分 区 总 数 , 这 个 可 以 通 过 broker 端 参 数
offset.topic.num.partitions 来配置,默认值是 50;

找到对应的分区号后,再寻找此分区 leader 副本所在 broker 节点,则此节点即为自己的 Grouping

Coordinator;


加入组 Join The Group


此阶段的重要操作之 1:选举消费组的 leader

private val members = new mutable.HashMap[String, MemberMetadata]
var leaderid = members.keys.head
set集合本身无序的,取头部的一个,自然也是无序的

消费组 leader 的选举,策略就是:随机!


此阶段的重要操作之 2:选择分区分配策略


最终选举的分配策略基本上可以看作被各个消费者支持的最多的策略,具体的选举过程如下:


(1)收集各个消费者支持的所有分配策略,组成候选集 candidates。


(2)每个消费者从候选集 candidates 找出第一个自身支持的策略,为这个策略投上一票。


(3)计算候选集中各个策略的选票数,选票数最多的策略即为当前消费组的分配策略(如果得票一样,那就以组长的为主)。


其实,此逻辑并不需要 consumer 来执行,而是由 Group Coordinator 来执行。


组信息同步 SYNC Group


此阶段,主要是由消费组 leader 将分区分配方案,通过 Group Coordinator 来转发给组中各消费者


心跳联系 HEART BEAT


进入这个阶段之后,消费组中的所有消费者就会处于正常工作状态。


各消费者在消费数据的同时,保持与 Group Coordinator的心跳通信。


消费者的心跳间隔时间由参数 heartbeat.interval.ms 指定,默认值为 3000 ,即这个参数必须比

session.timeout.ms 参 数 设 定 的 值 要 小 ; 一 般 情 况 下 heartbeat.interval.ms 的 配 置 值 不 能 超 过session.timeout.ms 配置值的 1/3 。这个参数可以调整得更低,以控制正常重新平衡的预期时间;


如果一个消费者发生崩溃,并停止读取消息,那么 GroupCoordinator 会等待一小段时间确认这个消费者死亡之后才会触发再均衡。在这一小段时间内,死掉的消费者并不会读取分区里的消息。


这 个 一 小 段 时 间 由 session.timeout. ms 参 数 控 制 , 该 参 数 的 配 置 值 必 须 在 broker 端 参 数group.min.session.timeout. ms (默认值为 6000 ,即 6 秒)和 group.max.session. timeout. ms (默认值为 300000 ,即 5 分钟)允许的范围内


再均衡流程


eager 协议的再均衡过程整体流程如下图:

特点:再均衡发生时,所有消费者都会停止工作,等待新方案的同步


Cooperative 协议的再均衡过程整体流程如下图:

特点:cooperative 把原来 eager 协议的一次性全局再均衡,化解成了多次的小均衡,并最终达到全局均衡的收敛状态


指定集合方式订阅主题

consumer.subscribe(Arrays.asList(topicl));
consumer.subscribe(Arrays.asList(topic2))

正则方式订阅主题


如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。


正则表达式的方式订阅的示例如下

consumer.subscribe(Pattern.compile ("topic.*" ));

利用正则表达式订阅主题,可实现动态订阅;


assign 订阅主题


消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可直接订阅某些主题的指定分区;


在 KafkaConsumer 中提供了 assign() 方法来实现这些功能,此方法的具体定义如下:

public void assign(Collection<TopicPartition> partitions)

这个方法只接受参数 partitions,用来指定需要订阅的分区集合。示例如下:

consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;


subscribe 与 assign 的区别


通过 subscribe()方法订阅主题具有消费者自动再均衡功能 ;


在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。


assign() 方法订阅分区时,是不具备消费者自动均衡的功能的;


其实这一点从 assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener类型参数的方法,而 assign()方法却没有。


取消订阅


既然有订阅,那么就有取消订阅;


可以使用 KafkaConsumer 中的 unsubscribe()方法采取消主题的订阅,这个方法既可以取消通过subscribe( Collection)方式实现的订阅;也可以取消通过 subscribe(Pattem)方式实现的订阅,还可以取消通过 assign( Collection)方式实现的订阅。示例码如下

consumer.unsubscribe();

如果将 subscribe(Collection )或 assign(Collection)集合参数设置为空集合,作用与 unsubscribe()方法相同,如下示例中三行代码的效果相同:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());


消息的消费模式


Kafka 中的消费是基于拉取模式的。


消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。


Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用 poll( ) 方法, poll( )方法返回的是所订阅的主题(分区)上的一组消息。


对于 poll ( ) 方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空,如果订阅的所有分区中都没有可供消费的消息,那么 poll( )方法返回为空的消息集;


poll ( ) 方法具体定义如下:

public ConsumerRecords<K, V> poll(final Duration timeout)

超时时间参数 timeout ,用来控制 poll( ) 方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。如果消费者程序只用来单纯拉取并消费数据,则为了提高吞吐率,可以把 timeout 设置为Long.MAX_VALUE;


消费者消费到的每条消息的类型为 ConsumerRecord

public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private volatile Long checksum;
topic partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。
offset 表示消息在所属分区的偏移量。
timestamp 表示时间戳,与此对应的 timestampType 表示时间戳的类型。
timestampType 有两种类型 CreateTime 和 LogAppendTime ,分别代表消息创建的时间戳和消息追加
到日志的时间戳。
headers 表示消息的头部内容。
key value 分别表示消息的键和消息的值,一般业务应用要读取的就是 value ;
serializedKeySize、serializedValueSize 分别表示 key、value 经过序列化之后的大小,如果 key 为空,
则 serializedKeySize 值为 -1,同样,如果 value 为空,则 serializedValueSize 的值也会为 -1;
checksum 是 CRC32 的校验值。

示例代码片段

/**
* 订阅与消费方式 2
*/
TopicPartition tp1 = new TopicPartition("x", 0);
TopicPartition tp2 = new TopicPartition("y", 0);
TopicPartition tp3 = new TopicPartition("z", 0);
List<TopicPartition> tps = Arrays.asList(tp1, tp2, tp3);
consumer.assign(tps);
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (TopicPartition tp : tps) {
    List<ConsumerRecord<String, String>> rList = records.records(tp);
    for (ConsumerRecord<String, String> r : rList) {
            r.topic();
            r.partition();
            r.offset();
            r.value();
            //do something to process record.
    }
  }
}


指定位移消费


有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer 中的 seek() 方法正好提供了这个功能,让我们可以追前消费或回溯消费。


seek()方法的具体定义如下:

public void seek(TopicPartiton partition,long offset)

代码示例:

public class ConsumerDemo3 指定偏移量消费 {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g002");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.clas
                s.getName());
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
    // 是否自动提交消费位移
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
    // 限制一次 poll 拉取到的数据量的最大值
        props.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,"10240000");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // assign 方式订阅 doit27-1 的两个分区
        TopicPartition tp0 = new TopicPartition("doit27-1", 0);
        TopicPartition tp1 = new TopicPartition("doit27-1", 1);
        consumer.assign(Arrays.asList(tp0,tp1));
    // 指定分区 0,从 offset:800 开始消费 ; 分区 1,从 offset:650 开始消费
        consumer.seek(tp0,200);
        consumer.seek(tp1,250);
    // 开始拉取消息
        while(true){
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(3000));
            for (ConsumerRecord<String, String> rec : poll) {
                System.out.println(rec.partition()+","+rec.key()+","+rec.value()+","+rec.offset()
                );
            }
        }
    }
}


自动提交消费者偏移量


Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为 5 秒,此参数生效的前提是 enable. auto.commit 参数为 true。


在默认的方式下,消费者每隔 5 秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。


Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。


  • 重复消费

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。


  • 丢失消息

按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形:


拉取线程不断地拉取消息并存入本地缓存,比如在 BlockingQueue 中,另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 y+l 次拉取,以及第 m 次位移提交的时候,也就是x+6 之前的位移己经确认提交了,处理线程却还正在处理 x+3 的消息;此时如果处理线程发生了异常,待其恢复之后会从第 m 次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3 至 x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。


手动提交消费者偏移量(调用 kafka api)


自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免;同时,自动位移提交也无法做到精确的位移管理。在 Kafka 中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。


很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费;


手动的提交方式可以让开发人员根据程序的逻辑在合适的时机进行位移提交。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false ,示例如下:

props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。


  • 同步提交的方式


commitSync()方法的定义如下:

/**
* 手动提交 offset
*/
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (ConsumerRecord<String, String> r : records) {
    //do something to process record.
  }
  consumer.commitSync();
}

对于采用 commitSync()的无参方法,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用 commitSync()的另一个有参方法,具体定义如下:

public void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets

示例代码如下:

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (ConsumerRecord<String, String> r : records) {
    long offset = r.offset();
    //do something to process record.
    TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
    consumer.commitSync(Collections.singletonMap(topicPartition,new
OffsetAndMetadata(offset+1)));
  }
}

提交的偏移量 = 消费完的 record 的偏移量 + 1


因为,__consumer_offsets 中记录的消费偏移量,代表的是,消费者下一次要读取的位置!!!


  • 异步提交方式


异步提交的方式( commitAsync())在执行的时候消费者线程不会被阻塞;可能在提交消费位移的结果还未返回之前就开始了新一次的拉取。异步提交可以让消费者的性能得到一定的增强。commitAsync 方法有一个不同的重载方法,具体定义如下:

示例代码

/**
* 异步提交 offset
*/
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (ConsumerRecord<String, String> r : records) {
    long offset = r.offset();
    //do something to process record.
    TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
    consumer.commitSync(Collections.singletonMap(topicPartition,new
OffsetAndMetadata(offset+1)));
    consumer.commitAsync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(offset + 1)), new OffsetCommitCallback() {
  @Override
  public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
        if(e == null ){
          System.out.println(map);
        }else{
          System.out.println("error commit offset");
        }
      }
    });
  }
}


手动提交位移(时机的选择)


  • 数据处理完成之前先提交偏移量


可能会发生漏处理的现象(数据丢失)


反过来说,这种方式实现了: at most once 的数据处理(传递)语义


  • 数据处理完成之后再提交偏移量


可能会发生重复处理的现象(数据重复)


反过来说,这种方式实现了: at least once 的数据处理(传递)语义


当然,数据处理(传递)的理想语义是: exactly once(精确一次)


Kafka 也能做到 exactly once(基于 kafka 的事务机制)


消费者提交偏移量方式的总结


consumer 的消费位移提交方式:


全自动

  • auto.offset.commit = true
  • 定时提交到 consumer_offsets


半自动

  • auto.offset.commit = false;
  • 然后手动触发提交 consumer.commitSync()
  • 提交到 consumer_offsets


全手动

  • auto.offset.commit = false;
  • 写自己的代码去把消费位移保存到你自己的地方 mysql/zk/redis
  • 提交到自己所涉及的存储;初始化时也需要自己去从自定义存储中查询到消费位移
目录
相关文章
|
1月前
|
消息中间件 存储 缓存
大厂面试高频:Kafka 工作原理 ( 详细图解 )
本文详细解析了 Kafka 的核心架构和实现原理,消息中间件是亿级互联网架构的基石,大厂面试高频,非常重要,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka 工作原理 ( 详细图解 )
|
1月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
76 2
|
2月前
|
存储 缓存 搜索推荐
Lazada淘宝详情API的价值与应用解析
在电商行业,数据是驱动业务增长的核心。Lazada作为东南亚知名电商平台,其商品详情API对电商行业影响深远。本文探讨了Lazada商品详情API的重要性,包括提供全面准确的商品信息、增强平台竞争力、促进销售转化、支持用户搜索和发现需求、数据驱动决策、竞品分析、用户行为研究及提升购物体验。文章还介绍了如何通过Lazada提供的API接口、编写代码及使用第三方工具实现实时数据获取。
66 3
|
2月前
|
XML JSON API
ServiceStack:不仅仅是一个高性能Web API和微服务框架,更是一站式解决方案——深入解析其多协议支持及简便开发流程,带您体验前所未有的.NET开发效率革命
【10月更文挑战第9天】ServiceStack 是一个高性能的 Web API 和微服务框架,支持 JSON、XML、CSV 等多种数据格式。它简化了 .NET 应用的开发流程,提供了直观的 RESTful 服务构建方式。ServiceStack 支持高并发请求和复杂业务逻辑,安装简单,通过 NuGet 包管理器即可快速集成。示例代码展示了如何创建一个返回当前日期的简单服务,包括定义请求和响应 DTO、实现服务逻辑、配置路由和宿主。ServiceStack 还支持 WebSocket、SignalR 等实时通信协议,具备自动验证、自动过滤器等丰富功能,适合快速搭建高性能、可扩展的服务端应用。
166 3
|
9天前
|
数据采集 JSON API
如何利用Python爬虫淘宝商品详情高级版(item_get_pro)API接口及返回值解析说明
本文介绍了如何利用Python爬虫技术调用淘宝商品详情高级版API接口(item_get_pro),获取商品的详细信息,包括标题、价格、销量等。文章涵盖了环境准备、API权限申请、请求构建和返回值解析等内容,强调了数据获取的合规性和安全性。
|
8天前
|
JSON 自然语言处理 Java
OpenAI API深度解析:参数、Token、计费与多种调用方式
随着人工智能技术的飞速发展,OpenAI API已成为许多开发者和企业的得力助手。本文将深入探讨OpenAI API的参数、Token、计费方式,以及如何通过Rest API(以Postman为例)、Java API调用、工具调用等方式实现与OpenAI的交互,并特别关注调用具有视觉功能的GPT-4o使用本地图片的功能。此外,本文还将介绍JSON模式、可重现输出的seed机制、使用代码统计Token数量、开发控制台循环聊天,以及基于最大Token数量的消息列表限制和会话长度管理的控制台循环聊天。
68 7
|
21天前
|
机器学习/深度学习 搜索推荐 API
淘宝/天猫按图搜索(拍立淘)API的深度解析与应用实践
在数字化时代,电商行业迅速发展,个性化、便捷性和高效性成为消费者新需求。淘宝/天猫推出的拍立淘API,利用图像识别技术,提供精准的购物搜索体验。本文深入探讨其原理、优势、应用场景及实现方法,助力电商技术和用户体验提升。
|
23天前
|
监控 数据管理 测试技术
API接口自动化测试深度解析与最佳实践指南
本文详细介绍了API接口自动化测试的重要性、核心概念及实施步骤,强调了从明确测试目标、选择合适工具、编写高质量测试用例到构建稳定测试环境、执行自动化测试、分析测试结果、回归测试及集成CI/CD流程的全过程,旨在为开发者提供一套全面的技术指南,确保API的高质量与稳定性。
|
1月前
|
JSON API 数据格式
二维码操作[二维码解析基础版]免费API接口教程
此接口用于解析标准二维码内容,支持通过BASE64编码或远程图片路径提交图片。请求需包含用户ID、用户KEY、图片方式及图片地址等参数,支持POST和GET方式。返回结果包括状态码和消息内容,适用于图片元素简单的二维码解析。
|
14天前
|
供应链 搜索推荐 数据挖掘
1688搜索词推荐API接口:开发应用与收益全解析
在电商数据驱动时代,1688搜索词推荐API接口为开发者、供应商及电商从业者提供强大工具,优化业务流程,提升竞争力。该接口基于1688平台的海量数据,提供精准搜索词推荐,助力电商平台优化搜索体验,提高供应商商品曝光度与销售转化率,同时为企业提供市场分析与商业洞察,促进精准决策与成本降低。通过集成此API,各方可实现流量增长、销售额提升及运营优化,推动电商行业的创新发展。
26 0

热门文章

最新文章

推荐镜像

更多