开发者社区> 问答> 正文

flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

我使用的flink 1.11.0版本 代码如下 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); tableEnvironment.executeSql(" " + " CREATE TABLE mySource ( " + " a bigint, " + " b bigint " + " ) WITH ( " + " 'connector.type' = 'kafka', " + " 'connector.version' = 'universal', " + " 'connector.topic' = 'mytesttopic', " + " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + " 'connector.properties.group.id' = 'flink-test-cxy', " + " 'connector.startup-mode' = 'latest-offset', " + " 'format.type' = 'json' " + " ) "); tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + " id bigint, " + " game_id varchar, " + " PRIMARY KEY (id) NOT ENFORCED " + " ) " + " with ( " + " 'connector.type' = 'jdbc', " + " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " + " 'connector.username' = 'root' , " + " 'connector.password' = 'root', " + " 'connector.table' = 'mysqlsink' , " + " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + " 'connector.write.flush.interval' = '2s', " + " 'connector.write.flush.max-rows' = '300' " + " )"); tableEnvironment.executeSql("insert into mysqlsink (id,game_id) values (select a,cast(b as varchar) b from mySource)");

问题一 : 上面的insert语句会出现如下错误 Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(BIGINT A, VARCHAR(2147483647) B)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'

问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '1' for key 'PRIMARY'*来自志愿者整理的flink邮件归档

展开
收起
雪哥哥 2021-12-06 22:00:26 1834 0
1 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
One Box: 解读事务与分析一体化数据库 HybridDB for MySQL 立即下载
One Box:解读事务与分析一体化数据库HybridDB for MySQL 立即下载
如何支撑HTAP场景-HybridDB for MySQL系统架构和技术演进 立即下载

相关镜像