kafka也有序列化?你知道么?

简介: kafka内部发送和接收消息的时候,使用的是byte[]字节数组的方式(RPC底层也是用这种通讯格式)。但是我们在应用层其实可以使用更多的数据类型,比如int,short,long,String等,这归功于kafka的序列化和反序列化机制。

简介


kafka内部发送和接收消息的时候,使用的是byte[]字节数组的方式(RPC底层也是用这种通讯格式)。但是我们在应用层其实可以使用更多的数据类型,比如int,short,

long,String等,这归功于kafka的序列化和反序列化机制。


基本原理分析


在之前的一篇文章[springboot集成kafka示例](http://www.machengyu.net/arch/2019/07/29/kafka-springboot.html)中,我使用的是kafka原生的StringSerializer序列化方式,

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

源码如下:

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";
    public StringSerializer() {
    }
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null) {
            encodingValue = configs.get("serializer.encoding");
        }
        if (encodingValue instanceof String) {
            this.encoding = (String)encodingValue;
        }
    }
    public byte[] serialize(String topic, String data) {
        try {
            return data == null ? null : data.getBytes(this.encoding);
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
        }
    }
    public void close() {
    }
}


其实很简单,configure方法设置序列化(serialize方法)需要使用的编码,如果没有设置就使用UTF8格式。这个方法是在生成producer实例的时候被调用的。serialize方法使用的就是String的getBytes把String类型的消息转化为byte字节数组。


反序列呢?聪明如你应该能想到,使用new String就可以解决了。源码如下,

@Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

是不是简单到爆呢?


其它的内置序列化组件,像Double, Integer,Long这些原理都类似,就不一一分析了。


自定义序列化组件


有时候内置的组件不能满足我们的需要。比如我有个自定义的对象要作为kafka的消息进行收发(把对象转化为json字符串通过String的方式也是一种思路),希望能有一个针对我这个对象自定义的序列化和反序列化组件。


我们先定义一个消息对象,

@Data
@ToString
public class Person {
    private int id;
    private String name;
    private int age;
}

然后自定义自己的序列化和反序列化实现类,

@Slf4j
public class PersonDeserializer implements Deserializer <Person> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }
    @Override
    public Person deserialize(String s, byte[] bytes) {
        log.info("自定义的反序列化-deserialize");
        return JSON.parseObject(bytes, Person.class);
    }
    @Override
    public void close() {
    }
}
@Slf4j
public class PersonSerializer implements Serializer<Person> {
    private static Gson gson;
    static {
        gson = new GsonBuilder().create();
    }
    @Override
    public void configure(Map<String, ?> map, boolean b) {
        log.info("自定义的序列化组件--configure");
    }
    @Override
    public byte[] serialize(String s, Person person) {
        log.info("自定义的序列化组件--serialize");
        return JSON.toJSONBytes(person);
    }
    @Override
    public void close() {
        log.info("自定义的序列化组件--close");
    }
}

代码一看就明白,其实核心就是利用fastjson的toJSONBytes把对象转化为byte数组。


然后我们在配置里指定使用我们自己的序列化和反序列化实现类,

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.ponymaggie.github.kafka.serializer.PersonSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.ponymaggie.github.kafka.serializer.PersonDeserializer

测试


本地kafka和zk环境搭建可以参考我之前的一篇文章:


http://www.machengyu.net/arch/2019/07/29/kafka-springboot.html


启动springboot项目,通过日志可以看出消息的收发都是正常的。

2019-08-15 20:06:34.251  INFO 16676 --- [           main] c.p.github.kafka.producer.KafkaSender    : +++++++++++++++++++++  message = {"id":1000,"name":"小明","age":30}
2019-08-15 20:06:34.648  INFO 16676 --- [ntainer#0-0-C-1] c.p.github.kafka.consumer.KafkaReceiver  : ----------------- record =ConsumerRecord(topic = malu, partition = 0, offset = 0, CreateTime = 1565870794430, serialized key size = -1, serialized value size = 36, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Person(id=1000, name=小明, age=30))

源码地址:


https://github.com/pony-maggie/springboot-kafka-serialization-demo

相关文章
|
6天前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
268 4
|
6天前
|
消息中间件 JSON 监控
Kafka 的消息格式:了解消息结构与序列化
Kafka 作为一款高性能的消息中间件系统,其消息格式对于消息的生产、传输和消费起着至关重要的作用。本篇博客将深入讨论 Kafka 的消息格式,包括消息的结构、序列化与反序列化,以及一些常用的消息格式选项。通过更丰富的示例代码和深入的解析,希望能够帮助大家更好地理解 Kafka 消息的内部机制。
|
10月前
|
消息中间件 Java Kafka
kafka 客户端使用Avro序列化
kafka 客户端使用Avro序列化
131 0
|
消息中间件 Kafka API
Flink 1.14.0 消费 kafka 数据自定义反序列化类
在最近发布的 Flink 1.14.0 版本中对 Source 接口进行了重构,细节可以参考 FLIP-27: Refactor Source Interface 重构之后 API 层面的改动还是非常大的,那在使用新的 API 消费 kafka 数据的时候如何自定义序列化类呢?
|
消息中间件 Kafka Apache
Kafka消息序列化和反序列化(上)
Kafka Producer在发送消息时必须配置的参数为:bootstrap.servers、key.serializer、value.serializer。序列化操作是在拦截器(Interceptor)执行之后并且在分配分区(partitions)之前执行的。
1492 0
|
消息中间件 Kafka
Kafka消息序列化和反序列化(下)
接上一篇:Kafka消息序列化和反序列化(上)。 有序列化就会有反序列化,反序列化的操作是在Kafka Consumer中完成的,使用起来只需要配置一下key.deserializer和value.deseriaizer。
1274 0
|
消息中间件 Kafka Apache
Kafka消息序列化和反序列化
Kafka消息序列化和反序列化
3541 0
|
6天前
|
存储 XML JSON
数据传输的艺术:深入探讨序列化与反序列化
数据传输的艺术:深入探讨序列化与反序列化
75 0
|
6天前
|
存储 安全 Java
Java一分钟之-Java序列化与反序列化
【5月更文挑战第14天】Java序列化用于将对象转换为字节流,便于存储和网络传输。实现`Serializable`接口使类可被序列化,但可能引发隐私泄露、版本兼容性和性能问题。要避免这些问题,可使用`transient`关键字、控制`serialVersionUID`及考虑使用安全的序列化库。示例代码展示了如何序列化和反序列化对象,强调了循环引用和未实现`Serializable`的错误。理解并妥善处理这些要点对优化代码至关重要。
15 1
|
6天前
|
JSON 安全 Java
Spring Boot 序列化、反序列化
本文介绍了Spring Boot中的序列化和反序列化。Java提供默认序列化机制,通过实现Serializable接口实现对象到字节流的转换。Spring Boot默认使用Jackson处理JSON,可通过注解和配置自定义规则。然而,序列化可能引发安全问题,建议使用白名单、数据校验和安全库。最佳实践包括使用标准机制、自定义规则及注意版本控制。文章还提醒关注性能并提供了相关参考资料。
70 2

热门文章

最新文章