dear all : 目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。 但是根据 “implements DeserializationFormatFactory, SerializationFormatFactory” 这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema 有方法 deserialize(ConsumerRecord<byte[], byte[]> record, Collector collector) 。 包装了offset 的对象:ConsumerRecord ,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?*来自志愿者整理的flink邮件归档
不能,除非你自己创建一个新的kafka connector。
不过,
kafka的offset、partition等信息是可以通过metadata的方式拿到的。
你是需要在DeserializationFormat里面拿到offset、partition?还是说后续的SQL拿到就行了?*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。