"根据我所知,通过Flink的AVRO反序列化,你可以创建一个Avro对象流,但存在一个问题,Flink的kafka消费者只创建单个对象的流: FlinkKafkaConsumerBase而不是默认的Kafka API与其KafkaConsumer。
在我的例子中,Key和Value都是单独的AVRO模式兼容对象,合并他们的模式可能是一场噩梦......
此外,使用Flink API无法检索ConsumerRecord信息?"
"基于Flink Kafka Consumer,有一个构造函数:class KafkaRecord {
private K key;
private V value;
private long offset;
private int partition;
private String topic;
...
}
class MySchema implements KeyedDeserializationSchema> {
KafkaRecord deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
KafkaRecord<K, V> rec = new KafkaRecord<>();
rec.key = KEY_DESERIaLISER.deserialize(messageKey);
rec.value = ...;
rec.topic = topic;
...
}
}
public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
第二个参数 - KeyedDeserializationSchema用于反序列化Kafka记录。它包括消息密钥,消息值,偏移量,主题等。因此,你可以MyKafkaRecord使用Avro密钥和Avro值实现名为T的自己的类型。然后,通过MyKafkaRecord为T您实现KeyedDeserializationSchema。请参考TypeInformationKeyValueSerializationSchema一个例子。
例如,阅读卡夫卡的额外信息:"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。