LINK JDBC SQL Connector遇到的类型转换问题

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: LINK JDBC SQL Connector遇到的类型转换问题

背景

Flink 1.3

最近在写Flink Sql的时候,遇到了java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long 问题

分析

直接上报错的sql,如下:

CREATE TABLE `xxx` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
);

具体的问题堆栈如下:

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
  at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
  at JoinTableFuncCollector$9.collect(Unknown Source)
  at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
  at LookupFunction$4$TableFunctionResultConverterCollector$2.collect(Unknown Source)
  at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
  at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:175)
  at LookupFunction$4.flatMap(Unknown Source)
  at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
  at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
  at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at StreamExecCalc$67.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
  at org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68)
  at StreamExecCorrelate$27$TableFunctionCollector$20.collect(Unknown Source)
  at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
  at StreamExecCorrelate$27$TableFunctionResultConverterCollector$25.collect(Unknown Source)
  at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
  at xxx.xxx.xxxFunction.eval(BinlogParserFunction.java:56)
  at StreamExecCorrelate$27.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at StreamExecCalc$17.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
  at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
  at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
  at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:212)
  at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:154)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:836)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

其实这是mysql和flink SQL在做字段类型映射的时候,会出现的类型匹配问题:

查看一下对应mysql字段如下:

+-----------------------+---------------------+------+-----+---------+----------------+
| Field                 | Type                | Null | Key | Default | Extra          |
+-----------------------+---------------------+------+-----+---------+----------------+
| merchantId            | int(11) unsigned    | NO   | PRI | NULL    | auto_increment |
| userId                | bigint(20)          | NO   | MUL | NULL    |                |
| status                | tinyint(1) unsigned | NO   |     | 1       |                |

再参考Flink Data Type Mapping,如下:

image.png

注意: 对于MYSQL类型是INT UNSIGNED 的字段,映射到FLINk应该为BIGINT类型,而不是INT类型

否则会报如下错误:

java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer

解决

把对一个的类型修改一下可以了:

CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
);
           ||
           \/
CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  INT
) WITH (
);


相关文章
|
3月前
|
SQL Java 数据库连接
JDBC的连接参数的设置导致rowid自动添加到sql
JDBC的连接参数的设置导致rowid自动添加到sql
38 1
|
6天前
|
SQL Java 数据库连接
JDBC连接SQL Server2008 完成增加、删除、查询、修改等基本信息基本格式及示例代码
这篇文章提供了使用JDBC连接SQL Server 2008数据库进行增加、删除、查询和修改操作的基本步骤和示例代码。
|
21天前
|
SQL Cloud Native 关系型数据库
云原生数据仓库使用问题之控制JDBC方式请求的SQL大小限制的参数是什么
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
|
2月前
|
SQL XML Java
后端数据库开发JDBC编程Mybatis之用基于XML文件的方式映射SQL语句实操
后端数据库开发JDBC编程Mybatis之用基于XML文件的方式映射SQL语句实操
45 3
|
1月前
|
SQL Java 关系型数据库
Java面试题:描述JDBC的工作原理,包括连接数据库、执行SQL语句等步骤。
Java面试题:描述JDBC的工作原理,包括连接数据库、执行SQL语句等步骤。
35 0
|
2月前
|
SQL Java 关系型数据库
Java与数据库连接技术JDBC关键核心之PreparedStatement以及SQL注入演示解决和原理
Java与数据库连接技术JDBC关键核心之PreparedStatement以及SQL注入演示解决和原理
25 0
|
2月前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
2月前
|
SQL 消息中间件 Java
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
2月前
|
SQL Java API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
2月前
|
SQL 关系型数据库 数据库
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(2)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】