开发者社区> 问答> 正文

sql-client batch 模式执行报错

我在sql-client提交任务:

create table csv_source1( id varchar, name varchar ) with ( type ='csv', path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv' );

create table csv_sink( id varchar, name varchar ) with ( type ='csv', path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv' );

insert into csv_sink select t1.name,t1.id from csv_source1 t1

错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 13:57:47 580 0
1 条回答
写回答
取消 提交回答
  • 我Debug了一下,目前Batch模式下的确会有问题,问题在于CsvTableFactory实现了 BatchCompatibleTableSinkFactory 而不是 BatchTableSinkFactory。

    /** * A CSV table factory. */ public class CsvTableFactory implements StreamTableSourceFactory , BatchTableSourceFactory , StreamTableSinkFactory , BatchCompatibleTableSinkFactory {

    而在 ExternalTableUtil.scala(line 111) 中使用Java SPI 寻找对应的Factory时,我们传入的范型参数是 BatchTableSinkFactory。

    /** * Converts an [[CatalogTable]] instance to a [[TableSink]] instance * * @param name name of the table * @param externalTable the [[CatalogTable]] instance to convert * @param isStreaming is in streaming mode or not. * @return */ def toTableSink( name: String, externalTable: CatalogTable, isStreaming: Boolean): TableSink[_] = {

    val tableProperties: TableProperties = generateTableProperties(name, externalTable, isStreaming) if (isStreaming) { val tableFactory = TableFactoryService.find(classOf[StreamTableSinkFactory[_]], getToolDescriptor(getStorageType(name, tableProperties), tableProperties)) tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap) } else { val tableFactory = TableFactoryService.find(classOf[BatchTableSinkFactory[_]], getToolDescriptor(getStorageType(name, tableProperties), tableProperties)) tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap) } }

    这样会导致找不到我们想要的 CsvTableFactory。进而会报一个 NoMatchingTableFactoryException的异常。

    我们引入 BatchCompatibleTableSinkFactory 的初衷是想复用部分connector的代码(批复用流),但现在看来还有些问题没考虑到。我们正在考虑如何修正。这里一个快速的fix是 参照 TableFactoryUtil(line97-108)的做法,在找不到 BatchTableSinkFactory 是,用 BatchCompatibleTableSinkFactory 再找一轮。更优雅的修复请等待我们的patch。

    至于为什么你这里是抛一个NPE,可能需要你远程调试一下,我不确定你的代码是否修改过。*来自志愿者整理的flink

    2021-12-07 15:27:16
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载