各位大佬:
由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 KafkaUpsertTableSink:
KafkaUpsertTableSink
KafkaUpsertTableSinkBase
KafkaUpsertTableSourceSinkFactory
KafkaUpsertTableSourceSinkFactoryBase
MyKafkaValidator
但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 呢?
/** * Searches for factories using Java service providers. * * @return all factories in the classpath */ private static List discoverFactories(Optional classLoader) { try { List result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); //todo add result.add(new KafkaUpsertTableSourceSinkFactory()); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); }
}
直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 以成功运行的。
非常感谢*来自志愿者整理的flink邮件归档
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。