开发者社区> 问答> 正文

【flink】flink sql insert into插入语句的问题

测试Flink版本:1.11.0

Flink 支持这种语法插入吗,在插入时指定具体的内容和要插入的列

插入 tableName(col1[,col2]) 选择 col1[,col2]

通过测试发现了以下问题

建表语句:

用 () 创建表 t1(a int,b string,c int);

用 () 创建表 t2(a int,b string,c int);

问题1:测试发现插入时发现和接收器模式的匹配规则是按照定义的顺序进行

测试语句:

insert into t2 select t1.a,t1.c, t1.b from t1;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: INT, b: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:891)

在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334)

在 scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Ite​​rable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并 没有自己,还是按照定义的顺序进行匹配

测试语句:

插入 t2(a,c,b) 从 t1 中选择 t1.a,t1.c, t1.b;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: INT, b: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:891)

在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334)

在 scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Ite​​rable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

问题3:当插入到你的字段比sink的模式中的字段少的时候也一样

测试语句:

插入 t2(a,b)

从 t1 中选择 t1.a、t1.b;

报错信息:

org.apache.flink.table.api.ValidationException:查询结果的字段类型 和注册的 TableSink default_catalog.default_database.t2 不匹配。

查询模式:[a: INT, c: VARCHAR(2147483647)]

接收器架构:[a: INT, b: VARCHAR(2147483647), c: INT]

在 org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)

在 org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)

在 scala.Option.map(Option.scala:146)

在 org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)

在 org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)

在 scala.collection.Iterator$class.foreach(Ite​​rator.scala:891)

在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1334)

在 scala.collection.IterableLike$class.foreach(Ite​​rableLike.scala:72)

在 scala.collection.AbstractIterable.foreach(Ite​​rable.scala:54)

在 scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

在 scala.collection.AbstractTraversable.map(Traversable.scala:104)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)

在 org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)

在 org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)

总结:

现在的实现限制了查询的和写人的知识,

只能找到schema定义的字段顺序才能正确的插入,

当很多时候会比较麻烦,

还有,只能插入部分列的需求也是存在的,目前不能支持*来自志愿者整理的flink邮件归档

展开
收起
小阿怪 2021-12-07 10:08:25 1540 0
1 条回答
写回答
取消 提交回答
  • 你好,

    Flink 目前已经不支持这个语法……我创建了一个问题[1],可以在里面查出这个特性的进展。

    [1] https://issues.apache.org/jira/browse/FLINK-18726*来自志愿者整理的flink邮件归档

    2021-12-07 11:41:57
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载