Flink version
1.18
Flink CDC version
3.0.0
Database and its version
starrocks 2.5.8
Minimal reproduce step
在删除列之后,立马新增或删除数据。
2024-01-22 17:36:22,556 INFO com.starrocks.connector.flink.catalog.StarRocksCatalog [] - Success to drop columns from user.tb_user, duration: 27261ms, sql: ALTER TABLE user.tb_user DROP COLUMN price PROPERTIES ("timeout" = "1800");
2024-01-22 17:36:22,572 INFO com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier [] - Successful to apply drop column, event: DropColumnEvent{tableId=user.tb_user, droppedColumns=[price BIGINT]}
2024-01-22 17:36:44,638 INFO org.apache.flink.runtime.xexecutiongraph.ExecutionGraph [] - PostPartition -> Sink Writer: StarRocks Sink -> Sink Committer: StarRocks Sink (1/1) (9b0c5f9677364d623c9b394f4bb350f9_0deb1b26a3d9eb3c8f0c11f7110b2903_0_0) switched from RUNNING to FAILED on localhost:44944-2b3027 @ localhost (dataPort=41166).
java.lang.IllegalArgumentException: null
at com.ververica.cdc.common.utils.Preconditions.checkArgument(Preconditions.java:106) ~[?:?]
at com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema.serializeRecord(EventRecordSerializationSchema.java:131) ~[?:?]
at com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema.applyDataChangeEvent(EventRecordSerializationSchema.java:119) ~[?:?]
at com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema.serialize(EventRecordSerializationSchema.java:78) ~[?:?]
at com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema.serialize(EventRecordSerializationSchema.java:45) ~[?:?]
at com.starrocks.connector.flink.table.sink.v2.StarRocksWriter.write(StarRocksWriter.java:139) ~[?:?]
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161) ~[flink-dist-1.18.0.jar:1.18.0]
at com.ververica.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:154) ~[?:?]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.0.jar:1.18.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.0.jar:1.18.0]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
checkpoint状态恢复上一状态后
2024-01-22 17:36:46,406 ERROR com.starrocks.connector.flink.catalog.StarRocksCatalog [] - Failed to drop columns from user.tb_user, sql: ALTER TABLE user.tb_user DROP COLUMN price PROPERTIES ("timeout" = "1800");
2024-01-22 17:36:46,424 INFO com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier [] - Successful to apply drop column, event: DropColumnEvent{tableId=user.tb_user, droppedColumns=[price BIGINT]}, and ignore the alter exception
com.starrocks.connector.flink.catalog.StarRocksCatalogException: Failed to drop columns from user.tb_user
at com.starrocks.connector.flink.catalog.StarRocksCatalog.alterDropColumns(StarRocksCatalog.java:331) ~[?:?]
at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applyDropColumn(StarRocksMetadataApplier.java:197) ~[?:?]
at com.ververica.cdc.connectors.starrocks.sink.StarRocksMetadataApplier.applySchemaChange(StarRocksMetadataApplier.java:75) ~[?:?]
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:82) ~[?:?]
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:149) ~[?:?]
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:123) ~[?:?]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:204) ~[flink-dist-1.18.0.jar:1.18.0]
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:82) ~[?:?]
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.flushSuccess(SchemaRegistryRequestHandler.java:149) ~[?:?]
at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleEventFromOperator(SchemaRegistry.java:123) ~[?:?]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:204) ~[flink-dist-1.18.0.jar:1.18.0]
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
传递给某个方法的参数为空。
确保你正确地配置了Flink CDC和StarRocks连接器,并且在删除列和添加新数据时遵循了正确的流程
这个错误是由于在删除列之后立即新增数据导致的。具体来说,是在com.ververica.cdc.connectors.starrocks.sink.EventRecordSerializationSchema
类的serializeRecord
方法中抛出的异常。建议检查代码逻辑,确保在删除列之后有适当的处理,例如等待一段时间或者重新初始化同步任务。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。