开发者社区> 问答> 正文

使用kafka接收消息时,报InvalidProtocolBufferException$Inval

使用的版本 1.1.3 drop table以后,运行CanalKafkaClientExample,报如下错误:

2019-08-20 09:22:50.159 [Thread-1] ERROR c.a.otter.canal.example.kafka.CanalKafkaClientExample - Error deserializing key/value for partition example2-0 at offset 0. If needed, please seek past the record to continue consumption. org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition example2-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: com.alibaba.otter.canal.protocol.exception.CanalClientException: deserializer failed at com.alibaba.otter.canal.client.CanalMessageDeserializer.deserializer(CanalMessageDeserializer.java:54) ~[classes/:na] at com.alibaba.otter.canal.client.CanalMessageDeserializer.deserializer(CanalMessageDeserializer.java:14) ~[classes/:na] at com.alibaba.otter.canal.client.kafka.MessageDeserializer.deserialize(MessageDeserializer.java:24) ~[classes/:na] at com.alibaba.otter.canal.client.kafka.MessageDeserializer.deserialize(MessageDeserializer.java:16) ~[classes/:na] at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.1.jar:na] at com.alibaba.otter.canal.client.kafka.KafkaCanalConnector.getListWithoutAck(KafkaCanalConnector.java:184) ~[classes/:na] at com.alibaba.otter.canal.example.kafka.CanalKafkaClientExample.process(CanalKafkaClientExample.java:113) [classes/:na] at com.alibaba.otter.canal.example.kafka.CanalKafkaClientExample.access$000(CanalKafkaClientExample.java:19) [classes/:na] at com.alibaba.otter.canal.example.kafka.CanalKafkaClientExample$3.run(CanalKafkaClientExample.java:77) [classes/:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_202] Caused by: com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException: Protocol message tag had invalid wire type. at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:115) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:551) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:514) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:633) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:295) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.CodedInputStream$ArrayDecoder.readGroup(CodedInputStream.java:868) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:541) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.GeneratedMessageV3.parseUnknownFieldProto3(GeneratedMessageV3.java:305) ~[protobuf-java-3.6.1.jar:na] at com.alibaba.otter.canal.protocol.CanalPacket$Packet.(CanalPacket.java:475) ~[classes/:na] at com.alibaba.otter.canal.protocol.CanalPacket$Packet.(CanalPacket.java:409) ~[classes/:na] at com.alibaba.otter.canal.protocol.CanalPacket$Packet$1.parsePartialFrom(CanalPacket.java:1396) ~[classes/:na] at com.alibaba.otter.canal.protocol.CanalPacket$Packet$1.parsePartialFrom(CanalPacket.java:1390) ~[classes/:na] at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:163) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:197) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:209) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:214) ~[protobuf-java-3.6.1.jar:na] at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) ~[protobuf-java-3.6.1.jar:na] at com.alibaba.otter.canal.protocol.CanalPacket$Packet.parseFrom(CanalPacket.java:867) ~[classes/:na] at com.alibaba.otter.canal.client.CanalMessageDeserializer.deserializer(CanalMessageDeserializer.java:22) ~[classes/:na] ... 18 common frames omitted

kafka里的消息如下:

提问219.png

原提问者GitHub用户kangjiabang

展开
收起
数据大拿 2023-05-04 11:26:10 138 0
1 条回答
写回答
取消 提交回答
  • 理解一下文档里canal.mq.flatMessage = true的意义,解析器配错了

    原回答者GitHub用户agapple

    2023-05-05 10:14:38
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载