Kafka消息序列化和反序列化(上)

简介: Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。

Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。

首先我们通过一段示例代码来看下普通情况下Kafka Producer如何编写:

public class ProducerJavaDemo {
    public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092";
    public static final String topic = "hidden-topic";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("client.id", "hidden-producer-client-id-1");
        properties.put("bootstrap.servers", brokerList);

        Producer<String,String> producer = new KafkaProducer<String,String>(properties);

        while (true) {
            String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu";
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
            try {
                Future<RecordMetadata> future =  producer.send(producerRecord, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.print(metadata.offset()+"    ");
                        System.out.print(metadata.topic()+"    ");
                        System.out.println(metadata.partition());
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

这里采用的客户端不是0.8.x.x时代的Scala版本,而是Java编写的新Kafka Producer, 相应的Maven依赖如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

上面的程序中使用的是Kafka客户端自带的org.apache.kafka.common.serialization.StringSerializer,除了用于String类型的序列化器之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有三种方法:

  1. public void configure(Map<String, ?> configs, boolean isKey):用来配置当前类。
  2. public byte[] serialize(String topic, T data):用来执行序列化。
  3. public void close():用来关闭当前序列化器。一般情况下这个方法都是个空方法,如果实现了此方法,必须确保此方法的幂等性,因为这个方法很可能会被KafkaProducer调用多次。

下面我们来看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具体实现,源码如下:

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue != null && encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

首先看下StringSerializer中的configure(Map

public class Company {
    private String name;
    private String address;
    //省略Getter, Setter, Constructor & toString方法
}

接下去我们来实现Company类型的Serializer,即下面代码示例中的DemoSerializer。

package com.hidden.client;
public class DemoSerializer implements Serializer<Company> {
    public void configure(Map<String, ?> configs, boolean isKey) {}
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }
    public void close() {}
}

使用时只需要在Kafka Producer的config中修改value.serializer属性即可,示例如下:

properties.put("value.serializer", "com.hidden.client.DemoSerializer");
//记得也要将相应的String类型改为Company类型,如:
//Producer<String,Company> producer = new KafkaProducer<String,Company>(properties);
//Company company = new Company();
//company.setName("hidden.cooperation-" + new Date().getTime());
//company.setAddress("Shanghai, China");
//ProducerRecord<String, Company> producerRecord = new ProducerRecord<String, Company>(topic,company);

示例中只修改了value.serializer,而key.serializer和value.serializer没有什么区别,如果有真实需要,修改以下也未尝不可。

接下一篇:Kafka消息序列化和反序列化(下)


欢迎支持笔者新书:《RabbitMQ实战指南》以及关注微信公众号:Kafka技术专栏。
这里写图片描述

目录
相关文章
|
2月前
|
JSON 数据格式 索引
Python中序列化/反序列化JSON格式的数据
【11月更文挑战第4天】本文介绍了 Python 中使用 `json` 模块进行序列化和反序列化的操作。序列化是指将 Python 对象(如字典、列表)转换为 JSON 字符串,主要使用 `json.dumps` 方法。示例包括基本的字典和列表序列化,以及自定义类的序列化。反序列化则是将 JSON 字符串转换回 Python 对象,使用 `json.loads` 方法。文中还提供了具体的代码示例,展示了如何处理不同类型的 Python 对象。
|
2月前
|
存储 安全 Java
Java编程中的对象序列化与反序列化
【10月更文挑战第22天】在Java的世界里,对象序列化和反序列化是数据持久化和网络传输的关键技术。本文将带你了解如何在Java中实现对象的序列化与反序列化,并探讨其背后的原理。通过实际代码示例,我们将一步步展示如何将复杂数据结构转换为字节流,以及如何将这些字节流还原为Java对象。文章还将讨论在使用序列化时应注意的安全性问题,以确保你的应用程序既高效又安全。
|
3月前
|
存储 Java
Java编程中的对象序列化与反序列化
【10月更文挑战第9天】在Java的世界里,对象序列化是连接数据持久化与网络通信的桥梁。本文将深入探讨Java对象序列化的机制、实践方法及反序列化过程,通过代码示例揭示其背后的原理。从基础概念到高级应用,我们将一步步揭开序列化技术的神秘面纱,让读者能够掌握这一强大工具,以应对数据存储和传输的挑战。
|
3月前
|
存储 安全 Java
Java编程中的对象序列化与反序列化
【10月更文挑战第3天】在Java编程的世界里,对象序列化与反序列化是实现数据持久化和网络传输的关键技术。本文将深入探讨Java序列化的原理、应用场景以及如何通过代码示例实现对象的序列化与反序列化过程。从基础概念到实践操作,我们将一步步揭示这一技术的魅力所在。
|
3月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
82 3
|
3月前
|
消息中间件 存储 分布式计算
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
35 1
|
2月前
|
存储 缓存 NoSQL
一篇搞懂!Java对象序列化与反序列化的底层逻辑
本文介绍了Java中的序列化与反序列化,包括基本概念、应用场景、实现方式及注意事项。序列化是将对象转换为字节流,便于存储和传输;反序列化则是将字节流还原为对象。文中详细讲解了实现序列化的步骤,以及常见的反序列化失败原因和最佳实践。通过实例和代码示例,帮助读者更好地理解和应用这一重要技术。
75 0
|
4月前
|
JSON fastjson Java
niubility!即使JavaBean没有默认无参构造器,fastjson也可以反序列化。- - - - 阿里Fastjson反序列化源码分析
本文详细分析了 Fastjson 反序列化对象的源码(版本 fastjson-1.2.60),揭示了即使 JavaBean 沲有默认无参构造器,Fastjson 仍能正常反序列化的技术内幕。文章通过案例展示了 Fastjson 在不同构造器情况下的行为,并深入探讨了 `ParserConfig#getDeserializer` 方法的核心逻辑。此外,还介绍了 ASM 字节码技术的应用及其在反序列化过程中的角色。
116 10
|
4月前
|
JSON 安全 编译器
扩展类实例的序列化和反序列化
扩展类实例的序列化和反序列化
55 1
|
4月前
|
存储 XML JSON
用示例说明序列化和反序列化
用示例说明序列化和反序列化
35 1