LINK JDBC SQL Connector遇到的类型转换问题

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: LINK JDBC SQL Connector遇到的类型转换问题

背景

Flink 1.3

最近在写Flink Sql的时候,遇到了java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long 问题

分析

直接上报错的sql,如下:

CREATE TABLE `xxx` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
);

具体的问题堆栈如下:

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
  at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
  at JoinTableFuncCollector$9.collect(Unknown Source)
  at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
  at LookupFunction$4$TableFunctionResultConverterCollector$2.collect(Unknown Source)
  at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
  at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:175)
  at LookupFunction$4.flatMap(Unknown Source)
  at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
  at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
  at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at StreamExecCalc$67.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
  at org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68)
  at StreamExecCorrelate$27$TableFunctionCollector$20.collect(Unknown Source)
  at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
  at StreamExecCorrelate$27$TableFunctionResultConverterCollector$25.collect(Unknown Source)
  at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
  at xxx.xxx.xxxFunction.eval(BinlogParserFunction.java:56)
  at StreamExecCorrelate$27.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at StreamExecCalc$17.processElement(Unknown Source)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
  at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
  at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
  at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
  at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
  at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
  at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:212)
  at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:154)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:836)
  at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
  at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

其实这是mysql和flink SQL在做字段类型映射的时候,会出现的类型匹配问题:

查看一下对应mysql字段如下:

+-----------------------+---------------------+------+-----+---------+----------------+
| Field                 | Type                | Null | Key | Default | Extra          |
+-----------------------+---------------------+------+-----+---------+----------------+
| merchantId            | int(11) unsigned    | NO   | PRI | NULL    | auto_increment |
| userId                | bigint(20)          | NO   | MUL | NULL    |                |
| status                | tinyint(1) unsigned | NO   |     | 1       |                |

再参考Flink Data Type Mapping,如下:

image.png

注意: 对于MYSQL类型是INT UNSIGNED 的字段,映射到FLINk应该为BIGINT类型,而不是INT类型

否则会报如下错误:

java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer

解决

把对一个的类型修改一下可以了:

CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
);
           ||
           \/
CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  INT
) WITH (
);


相关文章
|
3月前
|
SQL Java 数据库连接
[SQL]SQL注入与SQL执行过程(基于JDBC)
[SQL]SQL注入与SQL执行过程(基于JDBC)
52 0
|
5月前
|
Java 数据库连接 数据库
,从Flink 1.13版本开始,Flink Connector JDBC已经被移到了一个独立的仓库
,从Flink 1.13版本开始,Flink Connector JDBC已经被移到了一个独立的仓库
210 1
|
7月前
|
SQL Java 数据库连接
JSP商品进出库管理系统myeclipse开发sql数据库bs框架java编程jdbc
JSP 商品进出库管理系统是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,开发环境为TOMCAT7.0,Myeclipse8.5开发,数据库为SQLSERVER2008,使用java语言开发,系统主要采用B/S模式开发。
55 0
|
7月前
|
SQL Java 数据库连接
JSP婚纱影楼管理系统myeclipse开发sql数据库bs框架java编程jdbc
JSP 婚纱影楼管理系统是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,开发环境为TOMCAT7.0,Myeclipse8.5开发,数据库为SQLSERVER2008,使用java语言开发,系统主要采用B/S模式开发。
51 0
|
7月前
|
SQL 存储 Java
JDBC Statement:执行 SQL 语句的重要接口
在Java应用程序中,与数据库进行交互是一项常见的任务。为了执行数据库操作,我们需要使用JDBC(Java Database Connectivity)来建立与数据库的连接并执行SQL语句。Statement接口是JDBC中的一个重要接口,它用于执行SQL语句并与数据库进行交互。本文将详细介绍Statement接口的使用,包括如何创建Statement对象、执行SQL语句、处理结果等内容。
110 0
|
2月前
|
Java 数据库连接 数据库
Flink Connector JDBC已经被移到了一个独立的仓库
【2月更文挑战第23天】Flink Connector JDBC已经被移到了一个独立的仓库
14 1
|
7月前
|
SQL Java 数据库连接
JSP网上相亲交友系统myeclipse开发sql数据库bs框架java编程jdbc
JSP 网上相亲交友系统是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,开发环境为TOMCAT7.0,Myeclipse8.5开发,数据库为sqlserver2008,使用java语言开发,系统主要采用B/S模式开发。
71 0
|
3月前
|
SQL Java 数据库连接
Flink扩展问题之jdbc connector扩展失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
3月前
|
关系型数据库 MySQL Java
Flink cdc报错问题之使用jdbc connector报错如何解决
Flink CDC报错指的是使用Apache Flink的Change Data Capture(CDC)组件时遇到的错误和异常;本合集将汇总Flink CDC常见的报错情况,并提供相应的诊断和解决方法,帮助用户快速恢复数据处理任务的正常运行。
|
4月前
|
SQL Java 关系型数据库
JDBC技术【SQL注入、JDBC批量添加数据、JDBC事务处理、其他查询方式】(三)-全面详解(学习总结---从入门到深化)
JDBC技术【SQL注入、JDBC批量添加数据、JDBC事务处理、其他查询方式】(三)-全面详解(学习总结---从入门到深化)
35 0