开发者社区> 问答> 正文

如何使用FlinkKafkaConsumer分别解析密钥<K,V>而不是<T>

"根据我所知,通过Flink的AVRO反序列化,你可以创建一个Avro对象流,但存在一个问题,Flink的kafka消费者只创建单个对象的流: FlinkKafkaConsumerBase而不是默认的Kafka API与其KafkaConsumer。

在我的例子中,Key和Value都是单独的AVRO模式兼容对象,合并他们的模式可能是一场噩梦......

此外,使用Flink API无法检索ConsumerRecord信息?"

展开
收起
flink小助手 2018-11-28 15:46:30 3576 0
1 条回答
写回答
取消 提交回答
  • flink小助手会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关flink的问题及回答。

    "基于Flink Kafka Consumer,有一个构造函数:class KafkaRecord {
    private K key;
    private V value;
    private long offset;
    private int partition;
    private String topic;

    ...
    }

    class MySchema implements KeyedDeserializationSchema> {
    KafkaRecord deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {

    KafkaRecord<K, V> rec = new KafkaRecord<>();
    rec.key = KEY_DESERIaLISER.deserialize(messageKey);
    rec.value = ...;
    rec.topic = topic;
    ...

    }
    }

    public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema deserializer, Properties props) {

    this(Collections.singletonList(topic), deserializer, props);

    }
    第二个参数 - KeyedDeserializationSchema用于反序列化Kafka记录。它包括消息密钥,消息值,偏移量,主题等。因此,你可以MyKafkaRecord使用Avro密钥和Avro值实现名为T的自己的类型。然后,通过MyKafkaRecord为T您实现KeyedDeserializationSchema。请参考TypeInformationKeyValueSerializationSchema一个例子。

    例如,阅读卡夫卡的额外信息:"

    2019-07-17 23:16:46
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
神龙云服务器产品及技术深度解析 立即下载
弹性创造价值:基于ECS的最佳性价比实践解析 立即下载
又快又稳:阿里云下一代虚拟交换机解析 立即下载

相关镜像