开发者社区> 问答> 正文

如何使用kafka的KafkaDynamicTableFactory ,替换他的json forma

dear all : 目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。 但是根据 “implements DeserializationFormatFactory, SerializationFormatFactory” 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema 有方法 deserialize(ConsumerRecord<byte[], byte[]> record, Collector collector) 。 包装了offset 的对象:ConsumerRecord ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?*来自志愿者整理的flink邮件归档

展开
收起
彗星halation 2021-12-02 11:42:36 727 0
1 条回答
写回答
取消 提交回答
  • 不能,除非你自己创建一个新的kafka connector。

    不过,

    kafka的offset、partition等信息是可以通过metadata的方式拿到的。

    你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?*来自志愿者整理的FLINK邮件归档

    2021-12-02 11:55:45
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载