hello 我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗? 下面是我的异常日志以及sql文件
SET stream.enableCheckpointing=1000*60; SET stream.setParallelism=3;
-- Kafka cdbp zdao source 表 create TABLE cloud_behavior_source( operation STRING, operation_channel STRING, time
STRING, ip STRING, lat STRING, lng STRING, user_id STRING, device_id STRING, imei STRING, targets ARRAY<ROW<type
STRING,value
STRING>>, product_name STRING, product_version STRING, product_vendor STRING, platform STRING, platform_version STRING, languaage
STRING, locale STRING, other_para MAP<STRING, STRING NULL> ) with ( 'connector'='kafka', 'topic'='cloud_behavior', 'properties.bootstrap.servers'='', 'properties.group.id'='testGroup', 'format'='avro', 'scan.startup.mode'='earliest-offset' );
-- Hbase zdao uv 统计 Sink 表 create TABLE cloud_behavior_sink( operation STRING, operation_channel STRING, ip STRING, lat STRING, lng STRING, user_id STRING, device_id STRING ) with ( 'connector'='jdbc', 'url'='jdbc:mysql://hosts:3306/d_bigdata', 'table-name'='flink_sql_test', 'username'='', 'password'='', 'sink.buffer-flush.max-rows'='100' );
-- 业务过程 insert into cloud_behavior_sink select * from cloud_behavior_source;
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field types of query result and registered TableSink default_catalog.default_database.cloud_behavior_sink do not match. Query schema: [operation: VARCHAR(2147483647), operation_channel: VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei: VARCHAR(2147483647), targets: ARRAY<ROW<type
VARCHAR(2147483647), value
VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647), product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647), platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647), languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para: MAP<VARCHAR(2147483647), VARCHAR(2147483647)>] Sink schema: [operation: VARCHAR(2147483647), operation_channel: VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: VARCHAR(2147483647)] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.cloud_behavior_sink do not match. Query schema: [operation: VARCHAR(2147483647), operation_channel: VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei: VARCHAR(2147483647), targets: ARRAY<ROW<type
VARCHAR(2147483647), value
VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647), product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647), platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647), languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para: MAP<VARCHAR(2147483647), VARCHAR(2147483647)>] Sink schema: [operation: VARCHAR(2147483647), operation_channel: VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: VARCHAR(2147483647)] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) at com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97) at com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72) at com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53) at com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 11 more*来自志愿者整理的flink
你的source跟sink的字段数量都不一样多,你需要让insert的语句的query的table schema跟sink表的schema相同才可以。 比如可以用下面的SQL来写入:
insert into cloud_behavior_sink
select
operation,
operation_channel,
ip,
lat,
lng,
user_id,
device_id
from cloud_behavior_source;
```*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。