大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:


消费者的基本流程

消费者的参数、参数补充

Kafka 消息发送(Message Production)

在 Kafka 中,消息发送是指生产者将数据写入 Kafka 主题的过程。生产者是负责创建和发送消息的客户端应用,它们将数据转换为 Kafka 可识别的格式并发送到指定的主题中。


消息发送的过程

消息创建:生产者创建消息,包括主题名称、键(可选)、消息体等。键用于控制消息的分区,而消息体是实际的业务数据。

序列化:在消息发送之前,生产者需要将消息键和消息体序列化为字节数组,Kafka 只能处理字节数组格式的数据。

选择分区:消息被序列化后,生产者根据某种逻辑(如默认的哈希算法或自定义逻辑)将消息分配到某个特定的分区。

发送消息:消息被发送到 Kafka 集群的指定分区。Kafka 的 Broker 接收到消息后,会将其写入相应分区的日志文件中。

发送消息的配置参数

acks:定义生产者需要等待多少个副本确认消息已经收到,才认为消息发送成功。常见的值包括 0(不等待)、1(等待 Leader 确认)、all(等待所有副本确认)。

retries:当消息发送失败时,生产者重试的次数。

batch.size:生产者在发送消息前积累的消息批次大小。批次越大,吞吐量越高,但也会增加延迟。

自定义序列化器(Custom Serializer)

在 Kafka 中,生产者发送的消息需要先经过序列化处理。Kafka 提供了默认的序列化器(如 StringSerializer、ByteArraySerializer 等),但在某些情况下,可能需要自定义序列化器以支持特定的数据格式或优化性能。


什么是序列化器

序列化器的作用:序列化器将生产者的消息对象(如字符串、Java 对象等)转换为字节数组,以便 Kafka 能够存储和传输数据。

Kafka 的默认序列化器:Kafka 提供了多种默认序列化器来处理常见的数据类型,如字符串、整数和字节数组。

自定义序列化器的场景

复杂数据结构:当你的消息是复杂的对象结构(如嵌套的 JSON 对象、ProtoBuf 等),默认的序列化器可能无法满足需求。这时可以编写自定义序列化器,来处理这些复杂的结构。

性能优化:在一些高性能场景下,默认的序列化器可能无法满足低延迟、高吞吐量的需求。通过定制化的序列化器,可以优化序列化过程的效率。

自定义分区器(Custom Partitioner)

在 Kafka 中,分区器决定了消息被发送到哪个分区。Kafka 提供了默认的分区器(通常基于消息的键进行哈希计算),但在一些场景下,你可能希望自定义分区逻辑,以实现特定的消息分布策略。


分区器的作用

控制消息的分区:分区器的主要作用是根据消息的键或其他属性来确定消息应该发送到哪个分区。默认情况下,Kafka 使用键的哈希值来确定分区。

分区的意义:通过合理分配分区,可以实现消息的负载均衡、提高系统的并行处理能力,并确保相同键的消息总是被发送到同一个分区。

自定义分区器的场景

定制化的消息分布:在某些场景下,可能需要根据业务逻辑将消息定向到特定的分区。例如,按照用户 ID 分区、按照消息类型分区等。

特殊的分区需求:某些情况下,你可能希望确保某些分区具有更高的优先级或更大的存储能力,这时可以使用自定义分区器来实现这些需求。

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。

序列化器作用就是用于序列化要发送的消息的。

Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    default void configure(Map<String, ?> configs, boolean isKey) {
        // intentionally left blank
    }

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param headers headers associated with the record
     * @param data typed data
     * @return serialized bytes
     */
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    /**
     * Close this serializer.
     * <p>
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    default void close() {
        // intentionally left blank
    }
}

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {

    private String username;

    private String password;

    private Integer age;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String topic, User data) {
        if (null == data) {
            return null;
        }
        int userId = data.getUserId();
        String username = data.getUsername();
        String password = data.getPassword();
        int age = data.getAge();

        int usernameLen = 0;
        byte[] usernameBytes;
        if (null != username) {
            usernameBytes = username.getBytes(StandardCharsets.UTF_8);
            usernameLen = usernameBytes.length;
        } else {
            usernameBytes = new byte[0];

        }

        int passwordLen = 0;
        byte[] passwordBytes;
        if (null != password) {
            passwordBytes = password.getBytes(StandardCharsets.UTF_8);
            passwordLen = passwordBytes.length;
        } else {
            passwordBytes = new byte[0];
        }

        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);
        byteBuffer.putInt(userId);
        byteBuffer.putInt(usernameLen);
        byteBuffer.put(usernameBytes);
        byteBuffer.putInt(passwordLen);
        byteBuffer.put(passwordBytes);
        byteBuffer.putInt(age);
        return byteBuffer.array();
    }

    @Override
    public byte[] serialize(String topic, Headers headers, User data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

分区器

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer

可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
相关文章
|
26天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
2天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
308 14
|
18天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
5天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
20天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
22天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2584 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
4天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
177 2
|
2天前
|
编译器 C#
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
102 65
|
6天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
283 2
|
22天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1580 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码