开发者社区 > 数据库 > 关系型数据库 > 正文

PolarDB这个问题怎么解决?

PolarDB这个问题怎么解决?用flink-cdc监控咱们polardb的binlog,当表的索引有json类型索引的时候,会报如下的错误,在原生mysql8或者其他云数据库上对比,没有这个问题。2024-03-04 11:02:26
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:263)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:185)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:147)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:85)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1553)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1064)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
... 1 more
Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1709519278000, eventType=TABLE_MAP, serverId=27570245, headerLength=19, dataLength=157, nextPosition=18249559, flags=0}
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1488)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1709519278000, eventType=TABLE_MAP, serverId=27570245, headerLength=19, dataLength=157, nextPosition=18249559, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:341)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeTableMapEventData(EventDeserializer.java:313)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:237)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051)
... 3 more
Caused by: java.io.IOException: Stumbled upon long even though int expected
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readPackedInteger(ByteArrayInputStream.java:157)
at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventMetadataDeserializer.deserialize(TableMapEventMetadataDeserializer.java:67)
at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:59)
at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:37)

展开
收起
三分钟热度的鱼 2024-03-13 21:43:38 16 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    这个问题是由于Flink CDC在处理PolarDB的binlog时,遇到了包含JSON类型索引的表,导致异常。目前Flink CDC不支持这种类型的索引。为了解决这个问题,你可以尝试以下方法:

    1. 检查表结构,确保没有使用JSON类型的索引。如果可能的话,将JSON类型的索引更改为其他类型,例如VARCHAR或TEXT。

    2. 如果无法更改表结构,可以考虑使用其他工具或库来处理PolarDB的binlog,例如Debezium。这样,你可以自定义数据转换和处理逻辑,以避免与Flink CDC不兼容的问题。

    3. 向Flink CDC项目报告此问题,以便他们在未来的版本中修复此问题。你可以在Flink CDC的GitHub仓库中提交一个issue,详细描述你遇到的问题和建议的解决方案。

    2024-03-14 14:04:46
    赞同 展开评论 打赏
  • 对比的其它 mysql 版本是什么,不同 mysql 版本在 binlog 格式上会有不同。应该是 flink cdc 没有做兼容导致了报错,具体问题可以给 flinkcdc 社区提 issue,可以搜到相关 issue:https://github.com/ververica/flink-cdc-connectors/issues/2192 此回答整理自钉群“PolarDB 专家面对面 - 网络&连接&线程池功能”

    2024-03-13 22:51:20
    赞同 展开评论 打赏

热门讨论

热门文章

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载