我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下: import org.apache.kafka.common.serialization.Serializer;
import java.io.IOException;
public class UserViewSerializer implements Serializer { @Override public byte[] serialize(String topic, UserView data) { byte[] array = null; try { array = data.toByteBuffer().array(); } catch (IOException e) { e.printStackTrace(); } return array; } } 构造kafka的生产者,将UserView实例写入kafka队列,代码如下: KafkaProducer<String,UserView> producer = new KafkaProducer<>(props, new StringSerializer(), new UserViewSerializer()); 在Flink程序中用FlinkKafkaConsumer消费kafka队列中的avro格式的消息,就出现了主题中描述的问题,出现异常的代码如下: FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("UserView", AvroDeserializationSchema.forGeneric(SCHEMA), properties); 导致运行失败的异常信息如下:
Caused by: java.io.EOFException at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw (BinaryDecoder.java:827) at org.apache.avro.io.BinaryDecoder.doReadBytes (BinaryDecoder.java:349) at org.apache.avro.io.BinaryDecoder.readString (BinaryDecoder.java:263) at org.apache.avro.io.ResolvingDecoder.readString (ResolvingDecoder.java:201) at org.apache.avro.generic.GenericDatumReader.readString (GenericDatumReader.java:422) at org.apache.avro.generic.GenericDatumReader.readString (GenericDatumReader.java:414) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:181) at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField (GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord (GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion (GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read (GenericDatumReader.java:145) at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize (AvroDeserializationSchema.java:135) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize (KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop (KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:718) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run (SourceStreamTask.java:200)
希望大神不吝赐教。
来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档
你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat registry的avro格式吧 confluent schemat registry 在处理avro数据时会多写一个MAGIC_BYTE,一般avro是没有的,消费时用ConfluentRegistryAvroDeserializationSchema 试试。
[1] https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17036670 https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17036670
来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。