背景
Flink 1.3
最近在写Flink Sql的时候,遇到了java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
问题
分析
直接上报错的sql,如下:
CREATE TABLE `xxx` ( `merchantId` BIGINT, `userId` BIGINT, `status` BIGINT ) WITH ( );
具体的问题堆栈如下:
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154) at JoinTableFuncCollector$9.collect(Unknown Source) at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39) at LookupFunction$4$TableFunctionResultConverterCollector$2.collect(Unknown Source) at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196) at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:175) at LookupFunction$4.flatMap(Unknown Source) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at StreamExecCalc$67.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) at org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68) at StreamExecCorrelate$27$TableFunctionCollector$20.collect(Unknown Source) at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39) at StreamExecCorrelate$27$TableFunctionResultConverterCollector$25.collect(Unknown Source) at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196) at xxx.xxx.xxxFunction.eval(BinlogParserFunction.java:56) at StreamExecCorrelate$27.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at StreamExecCalc$17.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:212) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:154) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:836) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
其实这是mysql和flink SQL在做字段类型映射的时候,会出现的类型匹配问题:
查看一下对应mysql字段如下:
+-----------------------+---------------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +-----------------------+---------------------+------+-----+---------+----------------+ | merchantId | int(11) unsigned | NO | PRI | NULL | auto_increment | | userId | bigint(20) | NO | MUL | NULL | | | status | tinyint(1) unsigned | NO | | 1 | |
再参考Flink Data Type Mapping,如下:
注意: 对于MYSQL类型是INT UNSIGNED 的字段,映射到FLINk应该为BIGINT类型,而不是INT类型
否则会报如下错误:
java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
解决
把对一个的类型修改一下可以了:
CREATE TABLE `dim_merchant` ( `merchantId` BIGINT, `userId` BIGINT, `status` BIGINT ) WITH ( ); || \/ CREATE TABLE `dim_merchant` ( `merchantId` BIGINT, `userId` BIGINT, `status` INT ) WITH ( );