开发者社区> 问答> 正文

如何实现 KafkaUpsertTableSink?

各位大佬:

由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 KafkaUpsertTableSink:

KafkaUpsertTableSink

KafkaUpsertTableSinkBase

KafkaUpsertTableSourceSinkFactory

KafkaUpsertTableSourceSinkFactoryBase

MyKafkaValidator

但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 呢?

/** * Searches for factories using Java service providers. * * @return all factories in the classpath */ private static List discoverFactories(Optional classLoader) { try { List result = new LinkedList<>(); ClassLoader cl = classLoader.orElse(Thread.currentThread().getContextClassLoader()); ServiceLoader .load(TableFactory.class, cl) .iterator() .forEachRemaining(result::add); //todo add result.add(new KafkaUpsertTableSourceSinkFactory()); return result; } catch (ServiceConfigurationError e) { LOG.error("Could not load service provider for table factories.", e); throw new TableException("Could not load service provider for table factories.", e); }

}

直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 以成功运行的。

非常感谢*来自志愿者整理的flink邮件归档

展开
收起
塔塔塔塔塔塔 2021-12-02 17:50:46 1125 0
1 条回答
写回答
取消 提交回答
  • 你需要把你新增的Factory添加到 resources下的 

    META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?*来自志愿者整理的FLINK邮件归档

    2021-12-02 18:14:40
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
如何使用Tair增强数据结构构建丰富在线实时场景 立即下载
如何做小程序性能优化 立即下载
“静态调用链路发现”应用场景分析及实践探索 立即下载