Pre
#kafka spring.kafka.bootstrap-servers=10.11.114.247:9092 spring.kafka.producer.acks=1 spring.kafka.producer.retries=3 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.consumer.group-id=zfprocessor_group spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=com.artisan.common.entity.messages spring.kafka.consumer.max-poll-records=500 spring.kafka.consumer.fetch-min-size=10 spring.kafka.consumer.fetch-max-wait=10000ms spring.kafka.listener.missing-topics-fatal=false spring.kafka.listener.type=batch spring.kafka.listener.ack-mode=manual logging.level.org.springframework.kafka=ERROR logging.level.org.apache.kafka=ERROR
我们看看消费者反序列化,解析value的配置
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
Question
spring kafka 使用Jackson序列化, 如果存入kafka中的对象 包含 泛型,那么 默认情况下,这个泛型对象会被Jackson反序列为 LinkedHashMap . 抛出类型转换异常…
Answer
在实体类上增加如下注解
@Data public class Message<T> { private int messageCode; @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,include = JsonTypeInfo.As.PROPERTY,property = "@class") private T messageContent; }
反序列化后,多了个节点
扩展知识
Jackson JSON - Using @JsonTypeInfo annotation to handle polymorphic types