大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:


消费者的基本流程

消费者的参数、参数补充

Kafka 消息发送(Message Production)

在 Kafka 中,消息发送是指生产者将数据写入 Kafka 主题的过程。生产者是负责创建和发送消息的客户端应用,它们将数据转换为 Kafka 可识别的格式并发送到指定的主题中。


消息发送的过程

消息创建:生产者创建消息,包括主题名称、键(可选)、消息体等。键用于控制消息的分区,而消息体是实际的业务数据。

序列化:在消息发送之前,生产者需要将消息键和消息体序列化为字节数组,Kafka 只能处理字节数组格式的数据。

选择分区:消息被序列化后,生产者根据某种逻辑(如默认的哈希算法或自定义逻辑)将消息分配到某个特定的分区。

发送消息:消息被发送到 Kafka 集群的指定分区。Kafka 的 Broker 接收到消息后,会将其写入相应分区的日志文件中。

发送消息的配置参数

acks:定义生产者需要等待多少个副本确认消息已经收到,才认为消息发送成功。常见的值包括 0(不等待)、1(等待 Leader 确认)、all(等待所有副本确认)。

retries:当消息发送失败时,生产者重试的次数。

batch.size:生产者在发送消息前积累的消息批次大小。批次越大,吞吐量越高,但也会增加延迟。

自定义序列化器(Custom Serializer)

在 Kafka 中,生产者发送的消息需要先经过序列化处理。Kafka 提供了默认的序列化器(如 StringSerializer、ByteArraySerializer 等),但在某些情况下,可能需要自定义序列化器以支持特定的数据格式或优化性能。


什么是序列化器

序列化器的作用:序列化器将生产者的消息对象(如字符串、Java 对象等)转换为字节数组,以便 Kafka 能够存储和传输数据。

Kafka 的默认序列化器:Kafka 提供了多种默认序列化器来处理常见的数据类型,如字符串、整数和字节数组。

自定义序列化器的场景

复杂数据结构:当你的消息是复杂的对象结构(如嵌套的 JSON 对象、ProtoBuf 等),默认的序列化器可能无法满足需求。这时可以编写自定义序列化器,来处理这些复杂的结构。

性能优化:在一些高性能场景下,默认的序列化器可能无法满足低延迟、高吞吐量的需求。通过定制化的序列化器,可以优化序列化过程的效率。

自定义分区器(Custom Partitioner)

在 Kafka 中,分区器决定了消息被发送到哪个分区。Kafka 提供了默认的分区器(通常基于消息的键进行哈希计算),但在一些场景下,你可能希望自定义分区逻辑,以实现特定的消息分布策略。


分区器的作用

控制消息的分区:分区器的主要作用是根据消息的键或其他属性来确定消息应该发送到哪个分区。默认情况下,Kafka 使用键的哈希值来确定分区。

分区的意义:通过合理分配分区,可以实现消息的负载均衡、提高系统的并行处理能力,并确保相同键的消息总是被发送到同一个分区。

自定义分区器的场景

定制化的消息分布:在某些场景下,可能需要根据业务逻辑将消息定向到特定的分区。例如,按照用户 ID 分区、按照消息类型分区等。

特殊的分区需求:某些情况下,你可能希望确保某些分区具有更高的优先级或更大的存储能力,这时可以使用自定义分区器来实现这些需求。

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。

序列化器作用就是用于序列化要发送的消息的。

Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    default void configure(Map<String, ?> configs, boolean isKey) {
        // intentionally left blank
    }

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param headers headers associated with the record
     * @param data typed data
     * @return serialized bytes
     */
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    /**
     * Close this serializer.
     * <p>
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    default void close() {
        // intentionally left blank
    }
}

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {

    private String username;

    private String password;

    private Integer age;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String topic, User data) {
        if (null == data) {
            return null;
        }
        int userId = data.getUserId();
        String username = data.getUsername();
        String password = data.getPassword();
        int age = data.getAge();

        int usernameLen = 0;
        byte[] usernameBytes;
        if (null != username) {
            usernameBytes = username.getBytes(StandardCharsets.UTF_8);
            usernameLen = usernameBytes.length;
        } else {
            usernameBytes = new byte[0];

        }

        int passwordLen = 0;
        byte[] passwordBytes;
        if (null != password) {
            passwordBytes = password.getBytes(StandardCharsets.UTF_8);
            passwordLen = passwordBytes.length;
        } else {
            passwordBytes = new byte[0];
        }

        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);
        byteBuffer.putInt(userId);
        byteBuffer.putInt(usernameLen);
        byteBuffer.put(usernameBytes);
        byteBuffer.putInt(passwordLen);
        byteBuffer.put(passwordBytes);
        byteBuffer.putInt(age);
        return byteBuffer.array();
    }

    @Override
    public byte[] serialize(String topic, Headers headers, User data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

分区器

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer

可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
目录
相关文章
|
5天前
|
分布式计算 负载均衡 监控
大数据增加分区数量
【11月更文挑战第4天】
21 3
|
1月前
|
自然语言处理 大数据 应用服务中间件
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
大数据-172 Elasticsearch 索引操作 与 IK 分词器 自定义停用词 Nginx 服务
63 5
|
10天前
|
存储 安全 大数据
大数据水平分区增强可管理性
【11月更文挑战第2天】
23 5
|
10天前
|
存储 负载均衡 大数据
大数据水平分区提高查询性能
【11月更文挑战第2天】
24 4
|
9天前
|
存储 分布式计算 大数据
大数据减少单个分区的数据量
【11月更文挑战第3天】
28 2
|
11天前
|
存储 算法 大数据
大数据复合分区(Composite Partitioning)
【11月更文挑战第1天】
29 1
|
11天前
|
存储 大数据 数据管理
大数据垂直分区(Vertical Partitioning)
【11月更文挑战第1天】
23 1
|
11天前
|
存储 固态存储 大数据
大数据水平分区(Horizontal Partitioning)
【11月更文挑战第1天】
21 1
|
12天前
|
存储 大数据 OLAP
大数据数据分区技术
【10月更文挑战第26天】
43 2
|
13天前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?

热门文章

最新文章