大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现

简介: 大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:


消费者的基本流程

消费者的参数、参数补充

Kafka 消息发送(Message Production)

在 Kafka 中,消息发送是指生产者将数据写入 Kafka 主题的过程。生产者是负责创建和发送消息的客户端应用,它们将数据转换为 Kafka 可识别的格式并发送到指定的主题中。


消息发送的过程

消息创建:生产者创建消息,包括主题名称、键(可选)、消息体等。键用于控制消息的分区,而消息体是实际的业务数据。

序列化:在消息发送之前,生产者需要将消息键和消息体序列化为字节数组,Kafka 只能处理字节数组格式的数据。

选择分区:消息被序列化后,生产者根据某种逻辑(如默认的哈希算法或自定义逻辑)将消息分配到某个特定的分区。

发送消息:消息被发送到 Kafka 集群的指定分区。Kafka 的 Broker 接收到消息后,会将其写入相应分区的日志文件中。

发送消息的配置参数

acks:定义生产者需要等待多少个副本确认消息已经收到,才认为消息发送成功。常见的值包括 0(不等待)、1(等待 Leader 确认)、all(等待所有副本确认)。

retries:当消息发送失败时,生产者重试的次数。

batch.size:生产者在发送消息前积累的消息批次大小。批次越大,吞吐量越高,但也会增加延迟。

自定义序列化器(Custom Serializer)

在 Kafka 中,生产者发送的消息需要先经过序列化处理。Kafka 提供了默认的序列化器(如 StringSerializer、ByteArraySerializer 等),但在某些情况下,可能需要自定义序列化器以支持特定的数据格式或优化性能。


什么是序列化器

序列化器的作用:序列化器将生产者的消息对象(如字符串、Java 对象等)转换为字节数组,以便 Kafka 能够存储和传输数据。

Kafka 的默认序列化器:Kafka 提供了多种默认序列化器来处理常见的数据类型,如字符串、整数和字节数组。

自定义序列化器的场景

复杂数据结构:当你的消息是复杂的对象结构(如嵌套的 JSON 对象、ProtoBuf 等),默认的序列化器可能无法满足需求。这时可以编写自定义序列化器,来处理这些复杂的结构。

性能优化:在一些高性能场景下,默认的序列化器可能无法满足低延迟、高吞吐量的需求。通过定制化的序列化器,可以优化序列化过程的效率。

自定义分区器(Custom Partitioner)

在 Kafka 中,分区器决定了消息被发送到哪个分区。Kafka 提供了默认的分区器(通常基于消息的键进行哈希计算),但在一些场景下,你可能希望自定义分区逻辑,以实现特定的消息分布策略。


分区器的作用

控制消息的分区:分区器的主要作用是根据消息的键或其他属性来确定消息应该发送到哪个分区。默认情况下,Kafka 使用键的哈希值来确定分区。

分区的意义:通过合理分配分区,可以实现消息的负载均衡、提高系统的并行处理能力,并确保相同键的消息总是被发送到同一个分区。

自定义分区器的场景

定制化的消息分布:在某些场景下,可能需要根据业务逻辑将消息定向到特定的分区。例如,按照用户 ID 分区、按照消息类型分区等。

特殊的分区需求:某些情况下,你可能希望确保某些分区具有更高的优先级或更大的存储能力,这时可以使用自定义分区器来实现这些需求。

序列化器

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要将数据序列化成为字节数组。

序列化器作用就是用于序列化要发送的消息的。

Kafka通过 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数据。

public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    default void configure(Map<String, ?> configs, boolean isKey) {
        // intentionally left blank
    }

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param headers headers associated with the record
     * @param data typed data
     * @return serialized bytes
     */
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    /**
     * Close this serializer.
     * <p>
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    default void close() {
        // intentionally left blank
    }
}

其中Kafka也内置了一些实现好的序列化器:

  • ByteArraySerializer
  • StringSerializer
  • DoubleSerializer
  • 等等… 具体可以自行查看

自定义序列化器

自定义实体类

实现一个简单的类:

public class User {

    private String username;

    private String password;

    private Integer age;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

实现序列化

注意对象中的内容转换为字节数组的过程,要计算好开启的空间!!!

public class UserSerilazer implements Serializer<User> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        Serializer.super.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String topic, User data) {
        if (null == data) {
            return null;
        }
        int userId = data.getUserId();
        String username = data.getUsername();
        String password = data.getPassword();
        int age = data.getAge();

        int usernameLen = 0;
        byte[] usernameBytes;
        if (null != username) {
            usernameBytes = username.getBytes(StandardCharsets.UTF_8);
            usernameLen = usernameBytes.length;
        } else {
            usernameBytes = new byte[0];

        }

        int passwordLen = 0;
        byte[] passwordBytes;
        if (null != password) {
            passwordBytes = password.getBytes(StandardCharsets.UTF_8);
            passwordLen = passwordBytes.length;
        } else {
            passwordBytes = new byte[0];
        }

        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + usernameLen + 4 + passwordLen + 4);
        byteBuffer.putInt(userId);
        byteBuffer.putInt(usernameLen);
        byteBuffer.put(usernameBytes);
        byteBuffer.putInt(passwordLen);
        byteBuffer.put(passwordBytes);
        byteBuffer.putInt(age);
        return byteBuffer.array();
    }

    @Override
    public byte[] serialize(String topic, Headers headers, User data) {
        return Serializer.super.serialize(topic, headers, data);
    }

    @Override
    public void close() {
        Serializer.super.close();
    }
}

分区器

默认情况下的分区计算:

  • 如果Record提供了分区号,则使用Record提供的分区号
  • 如果Record没有提供分区号,则使用Key序列化后值的Hash值对分区数取模
  • 如果Record没有提供分区号,也没有提供Key,则使用轮询的方式分配分区号

我们在这里可以看到对应的内容:

org.apache.kafka.clients.producer

可以看到,如果 Partition 是 null的话,会有函数来进行分区,跟进去,可以看到如下方法:

自定义分区器

如果要自定义分区器, 需要:

  • 首先开发Partitioner接口中的实现类
  • 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xxx.xxx.class”)
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
目录
相关文章
|
8月前
|
传感器 Java 大数据
Java 大视界 -- 基于 Java 的大数据实时数据处理在车联网车辆协同控制中的应用与挑战(197)
本文深入探讨了基于 Java 的大数据实时数据处理在车联网车辆协同控制中的关键应用与技术挑战。内容涵盖数据采集、传输与实时处理框架,并结合实际案例分析了其在车辆状态监测、交通优化与协同驾驶中的应用效果,展示了 Java 大数据技术在提升交通安全性与效率方面的巨大潜力。
|
8月前
|
存储 分布式计算 Java
Java 大视界 -- Java 大数据在智能建筑能耗监测与节能策略制定中的应用(182)
本文探讨了Java大数据技术在智能建筑能耗监测与节能策略制定中的关键应用。通过Hadoop、Spark等技术实现能耗数据的存储、分析与可视化,结合实际案例,展示了Java大数据如何助力建筑行业实现节能减排目标。
|
8月前
|
存储 机器学习/深度学习 Java
Java 大视界 -- Java 大数据在智慧水利水资源调度与水情预测中的应用创新(180)
本文探讨了Java大数据技术在智慧水利中的创新应用,重点分析了其在水资源调度与水情预测中的关键技术与实践案例。通过大数据存储、实时处理与深度学习模型,Java有效提升了水利数据管理效率与水情预测准确性,助力传统水利向智能化转型。
|
6月前
|
存储 并行计算 算法
【动态多目标优化算法】基于自适应启动策略的混合交叉动态约束多目标优化算法(MC-DCMOEA)求解CEC2023研究(Matlab代码实现)
【动态多目标优化算法】基于自适应启动策略的混合交叉动态约束多目标优化算法(MC-DCMOEA)求解CEC2023研究(Matlab代码实现)
295 4
|
8月前
|
存储 搜索推荐 算法
Java 大视界 -- Java 大数据在智能金融理财产品风险评估与个性化配置中的应用(195)
本文深入探讨了Java大数据技术在智能金融理财产品风险评估与个性化配置中的关键应用。通过高效的数据采集、存储与分析,Java大数据技术助力金融机构实现精准风险评估与个性化推荐,提升投资收益并降低风险。
Java 大视界 -- Java 大数据在智能金融理财产品风险评估与个性化配置中的应用(195)
|
7月前
|
存储 供应链 数据可视化
Java 大视界 -- 基于 Java 的大数据可视化在企业供应链风险预警与决策支持中的应用(204)
本篇文章探讨了基于 Java 的大数据可视化技术在企业供应链风险预警与决策支持中的深度应用。文章系统介绍了从数据采集、存储、处理到可视化呈现的完整技术方案,结合供应链风险预警与决策支持的实际案例,展示了 Java 大数据技术如何助力企业实现高效、智能的供应链管理。
|
7月前
|
存储 SQL Java
Java 大视界 -- Java 大数据在智能医疗手术风险评估与术前方案制定中的应用探索(203)
本文探讨了Java大数据技术在智能医疗手术风险评估与术前方案制定中的创新应用。通过多源数据整合、智能分析模型构建及知识图谱技术,提升手术风险预测准确性与术前方案制定效率,助力医疗决策智能化,推动精准医疗发展。
|
9月前
|
机器学习/深度学习 数据挖掘 大数据
大数据集特征工程实践:将54万样本预测误差降低68%的技术路径与代码实现详解
本文通过实际案例演示特征工程在回归任务中的应用效果,重点分析包含数值型、分类型和时间序列特征的大规模表格数据集的处理方法。
343 0
大数据集特征工程实践:将54万样本预测误差降低68%的技术路径与代码实现详解
|
8月前
|
机器学习/深度学习 Java 大数据
Java 大视界 -- Java 大数据在智能政务公共资源交易数据分析与监管中的应用(202)
本篇文章深入探讨了 Java 大数据在智能政务公共资源交易监管中的创新应用。通过构建高效的数据采集、智能分析与可视化决策系统,Java 大数据技术成功破解了传统监管中的数据孤岛、效率低下和监管滞后等难题,为公共资源交易打造了“智慧卫士”,助力政务监管迈向智能化、精准化新时代。
|
8月前
|
数据采集 机器学习/深度学习 Java
Java 大视界 -- Java 大数据在智能体育赛事运动员体能监测与训练计划调整中的应用(200)
本篇文章聚焦 Java 大数据在智能体育赛事中对运动员体能监测与训练计划的智能化应用。通过构建实时数据采集与分析系统,结合机器学习模型,实现对运动员体能状态的精准评估与训练方案的动态优化,推动体育训练迈向科学化、个性化新高度。

热门文章

最新文章