LINK JDBC SQL Connector遇到的类型转换问题

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: LINK JDBC SQL Connector遇到的类型转换问题

背景

Flink 1.3

最近在写Flink Sql的时候,遇到了java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long 问题

分析

直接上报错的sql,如下:

CREATE TABLE `xxx` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
);

具体的问题堆栈如下:

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
  at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
  at JoinTableFuncCollector$9.collect(Unknown Source)
  at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
  at LookupFunction$4$TableFunctionResultConverterCollector$2.collect(Unknown Source)
  at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
  at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:175)
  at LookupFunction$4.flatMap(Unknown Source)
  at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
  at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
  at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at StreamExecCalc$67.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
  at org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68)
  at StreamExecCorrelate$27$TableFunctionCollector$20.collect(Unknown Source)
  at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
  at StreamExecCorrelate$27$TableFunctionResultConverterCollector$25.collect(Unknown Source)
  at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
  at xxx.xxx.xxxFunction.eval(BinlogParserFunction.java:56)
  at StreamExecCorrelate$27.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at StreamExecCalc$17.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
  at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
  at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
  at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:212)
  at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:154)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:836)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

其实这是mysql和flink SQL在做字段类型映射的时候,会出现的类型匹配问题:

查看一下对应mysql字段如下:

+-----------------------+---------------------+------+-----+---------+----------------+
| Field                 | Type                | Null | Key | Default | Extra          |
+-----------------------+---------------------+------+-----+---------+----------------+
| merchantId            | int(11) unsigned    | NO   | PRI | NULL    | auto_increment |
| userId                | bigint(20)          | NO   | MUL | NULL    |                |
| status                | tinyint(1) unsigned | NO   |     | 1       |                |

再参考Flink Data Type Mapping,如下:

image.png

注意: 对于MYSQL类型是INT UNSIGNED 的字段,映射到FLINk应该为BIGINT类型,而不是INT类型

否则会报如下错误:

java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer

解决

把对一个的类型修改一下可以了:

CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
);
           ||
           \/
CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  INT
) WITH (
);


相关文章
|
1月前
|
SQL Java 数据库连接
[SQL]SQL注入与SQL执行过程(基于JDBC)
本文介绍了SQL注入的概念及其危害,通过示例说明了恶意输入如何导致SQL语句异常执行。同时,详细解释了SQL语句的执行过程,并提出了使用PreparedStatement来防止SQL注入的方法。
44 1
|
2月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
53 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
97 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
43 0
|
2月前
|
SQL 分布式计算 关系型数据库
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
56 0
|
4月前
|
SQL Java 数据库连接
JDBC连接SQL Server2008 完成增加、删除、查询、修改等基本信息基本格式及示例代码
这篇文章提供了使用JDBC连接SQL Server 2008数据库进行增加、删除、查询和修改操作的基本步骤和示例代码。
|
5月前
|
SQL Cloud Native 关系型数据库
云原生数据仓库使用问题之控制JDBC方式请求的SQL大小限制的参数是什么
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
|
6月前
|
SQL XML Java
后端数据库开发JDBC编程Mybatis之用基于XML文件的方式映射SQL语句实操
后端数据库开发JDBC编程Mybatis之用基于XML文件的方式映射SQL语句实操
77 3
|
5月前
|
SQL Java 关系型数据库
Java面试题:描述JDBC的工作原理,包括连接数据库、执行SQL语句等步骤。
Java面试题:描述JDBC的工作原理,包括连接数据库、执行SQL语句等步骤。
72 0
|
6月前
|
SQL Java 关系型数据库
Java与数据库连接技术JDBC关键核心之PreparedStatement以及SQL注入演示解决和原理
Java与数据库连接技术JDBC关键核心之PreparedStatement以及SQL注入演示解决和原理
43 0