大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡

简介: 大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下内容:


消费组测试,消费者变动对消费的影响

消费者的心跳机制

消费者的相关配置参数

682ff08a122671ae7e960a1d0948a640_72d9547a846a4eacb683562672ba16c6.png 主题和分区

Topic:Kafka用于分类管理消息的逻辑单元,类似于MySQL的数据库

Partition:是Kafka下数据存储的基本单元,这个是物理上的概念,同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以在多台机器上。优势在于可以进行水平扩展,通常Partition的数量是BrokerServer数量的整数倍

ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部消息。在消息组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。

Consumer 采用 PULL 模式从 Broker 中读取数据,采用PULL模式 Consumer可以自行控制消费的速度。

d938a1033a8c01828ed8c7f30fdda370_8883489da0f4494a92552b729d20e10f.png

反序列化

  • Kafka的Broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交由给用户程序消费。
  • 消费者的反序列化器包括Key和Value。

自定义反序列化

如果要实现自定义的反序列化器,需要实现 Deserializer 接口:

public class UserDeserializer implements Deserializer<User> {


    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Deserializer.super.configure(configs, isKey);
    }

    @Override
    public User deserialize(String topic, byte[] data) {
        ByteBuffer buffer = ByteBuffer.allocate(data.length);
        buffer.put(data);
        buffer.flip();
        int userId = buffer.getInt();
        int usernameLen = buffer.getInt();
        String username = new String(data, 8, usernameLen);
        int passwordLen = buffer.getInt();
        String password = new String(data, 8 + usernameLen, passwordLen);
        int age = buffer.getInt();
        User user = new User();
        user.setUserId(userId);
        user.setUsername(username);
        user.setPassword(password);
        user.setAge(age);
        return user;
    }

    @Override
    public User deserialize(String topic, Headers headers, byte[] data) {
        return Deserializer.super.deserialize(topic, headers, data);
    }

    @Override
    public void close() {
        Deserializer.super.close();
    }
}

消费者拦截器

消费者在拉取了分区消息之后,要首先经过反序列化器对Key和Value进行反序列化操作。

消费端定义消息拦截器,要实现 ConsumerInterceptor接口:


一个可插拔的接口,允许拦截、更改消费者接收到的消息,首要的用例在于将第三方组件引入消费者应用程序,用于定制监控、日志处理等

该接口的实现类通过configure方法获取消费者配置的属性,如果消费者配置中没有指定ClientID,还可以获取KafkaConsumer生成的ClientID,获取这个配置跟其他拦截器是共享的,需要保证不会在各个拦截器之间产生冲突。

ConsumerInterceptor方法抛出异常会被捕获,但不会向下传播,如果配置了错误的参数类型,消费者不会抛出异常而是记录下来。

ConsumerInterceptor回调发生在KafkaConsumer.poll()方法的同一个线程

public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.out.println("=== 消费者拦截器 01 onConsume ===");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println("=== 消费者拦截器 01 onCommit ===");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("消费者设置的参数");
        configs.forEach((k, v) -> {
            System.out.println(k + ", " + v);
        });
    }
}

位移提交

相关概念

Consumer 需要向Kafka记录自己的位移数据,这个汇报过程称为:提交位移(Committing Offsets)

Consumer 需要为分配给它的每个分区提交各自的位移数据

位移提交的由Consumer端负责的,Kafka只负责保管,存到 __consumer_offsets 中

位移提交:自动提交和手动提交

位移提交:同步提交和异步提交

自动提交

Kafka Consumer后台提交


开启自动提交 enable.auto.commit=true

配置启动提交间隔:auto.commit.interval.ms,默认是5秒

位移顺序

自动提交位移的顺序:


配置 enable.auto.commit=true

Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息的

因此自动提交不会出现消息丢失,但是会重复消费

重复消费

重复消费的场景:


Consumer设置5秒提交offset

假设提交offset后3秒发生了Rebalance

Rebalance之后所有的Consumer从上一次提交的Offset的地方继续消费

因为Rebalance发生前3秒的内的提交就丢失了

异步提交

使用 KafkaConsumer#commitSync,会提交所有poll返回的最新Offset

该方法为同步操作 等待直到 offset 被成功提交才返回

手动同步提交可以控制offset提交的时机和频率

位移管理

Kafka中,消费者根据消息的位移顺序消费消息,消费者的位移由消费者者管理,Kafka提供了消费者的API,让消费者自行管理位移。

7bbff4ef60b26dc634b932596cea45ed_665537911b164699b7be5a749831bae6.png

重平衡

重平衡可以说是Kafka中诟病最厉害的一部分。

重平衡是一个协议,它规定了如何让消费者组下的所有消费者来分配Topic中每一个分区。

比如一个Topic中有100个分区,一个消费组内有20个消费者,在协调者的控制下可以让每一个消费者能分配到5个分区,这个分配过程就是重平衡。


重平衡的出发条件主要有三个:


消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。

主题的分区数发生变化,Kafka目前只能增加分区数,当增加的时候就会触发重平衡

订阅的主题发生变化,当消费组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会重平衡

为什么说重平衡让人诟病呢?因为重平衡过程中,消费者无法从Kafka消费消息,对Kafka的TPS影响极大,而如果Kafka集群内节点较多,比如数百个,重平衡耗时会很久。


避免重平衡

要完全避免重平衡做不到,但是要尽量避免重平衡。

在分布式系统中,由于网络问题没有接收到心跳,此时不确认是挂了还是负载没过来还是网络阻塞。


session.timeout.ms 规定超时时间是多久

heartbeat.interval.ms 规定心跳的频率 越高越不容易误判 但是会消耗更多资源

max.poll.interval.ms 消费者poll数据后,需要处理在进行拉取,如果两次拉取时间超过间隔就会被剔除,默认是5分钟。

这里给出一些推荐参数的配置:


session.timeout.ms 设置为6秒

heaertbeat.interval.ms 设置2秒

max.poll.interval.ms 推荐消费者处理消息最长耗时再加1分钟


相关文章
|
26天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
3天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
335 14
|
18天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
6天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
21天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
23天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2588 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
5天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
178 2
|
3天前
|
编译器 C#
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
104 65
|
6天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
299 2
|
22天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1580 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码