问题一:Flink CDC里flinkcdc写hudi的时候这个错误怎么弄?
Flink CDC里flinkcdc写hudi的时候这个错误怎么弄? own Source)
at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1082)
at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1898)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1700)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:707)
81261 [mini-cluster-io-thread-3] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.
81314 [flink-metrics-8] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping Akka RPC service.
81314 [flink-metrics-8] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.
81322 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.blob.PermanentBlobCache - Shutting down BLOB cache
81324 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.blob.TransientBlobCache - Shutting down BLOB cache
81328 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:57196
81332 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service.
81332 [ForkJoinPool.commonPool-worker-4] WARN org.apache.flink.client.program.PerJobMiniClusterFactory - Shutdown of MiniCluster failed.
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_3.
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:629)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage$$$capture(ActorCell.scala:580)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: org.apache.flink.runtime.jobmaster.JobMasterException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:391)
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:624)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:623)
... 22 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:90)
at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:585)
at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:965)
at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:882)
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:389)
... 26 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /test/dept/.hoodie/hoodie.properties could only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s) running and 3 node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2276)
参考答案:
你可以重启一下Hadoop试试。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/590890
问题二:Flink这个是啥原因?
INTERNAL: Occur VvpException or FlinkSQLException during submitting preview: org.apache.flink.table.api.ValidationException: SQL validation failed. Unable to create a source for reading table 'vvp.default.source_table'.
The cause is following: Cannot discover a connector using option: 'connector'='mysql-cdc'
Flink这个是啥原因?
参考答案:
换成mysql。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/592132
问题三:使用FLink CDC 3.0 ,将MySQL数据同步到SR遇到一些问题
运行环境
MySQL:5.7x
StarRocks:2.5
FLink:1.18.0
FLink-CDC:3.0
配置文件如下
source: type: mysql hostname: xxx port: 3306 username: xxx password: xxx tables: xxx_db.\.* server-id: 1 server-time-zone: Asia/Shanghai sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://xxx:9030 load-url: xxx:8030 username: root password: "" table.create.properties.replication_num: 1 pipeline: name: Sync MySQL Database to StarRocks parallelism: 1
DDL问题:
- 添加字段,可以同步成功,但任务会报错,必须取消任务重启后才能恢复正常
- 删除字段,可以同步成功,问题同上
- 修改字段类型,不会同步
问题1的错误日志片段如下:
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: temp of AddColumnEvent is already existed at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at com.ververica.cdc.runtime.operators.schema.SchemaOperator.sendRequestToCoordinator(SchemaOperator.java:123) ... 30 more Caused by: java.lang.IllegalArgumentException: temp of AddColumnEvent is already existed at com.ververica.cdc.common.utils.SchemaUtils.applyAddColumnEvent(SchemaUtils.java:73) at com.ververica.cdc.common.utils.SchemaUtils.applySchemaChangeEvent(SchemaUtils.java:53) at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaManager.applySchemaChange(SchemaManager.java:113) at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.handleSchemaChangeRequest(SchemaRegistryRequestHandler.java:102) at com.ververica.cdc.runtime.operators.schema.coordinator.SchemaRegistry.handleCoordinationRequest(SchemaRegistry.java:157) at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverCoordinationRequestToCoordinator(DefaultOperatorCoordinatorHandler.java:143) at org.apache.flink.runtime.scheduler.SchedulerBase.deliverCoordinationRequestToCoordinator(SchedulerBase.java:1070) at org.apache.flink.runtime.jobmaster.JobMaster.sendRequestToCoordinator(JobMaster.java:616) at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
DML问题:
- 取消同步任务,在MySQL上删除一条数据,然后重启同步任务,删除的数据无法同步在SR上删除;但同样的操作,添加数据可以同步
参考答案:
这个问题可能是由于Flink CDC在处理删除字段时出现了问题。你可以尝试以下方法解决这个问题:
- 检查FLink CDC的版本,确保它是最新的。如果不是,请升级到最新版本,看看问题是否得到解决。
- 如果问题仍然存在,你可以尝试在FLink CDC的GitHub仓库中提交一个issue,详细描述你遇到的问题。这样,FLink团队可能会关注这个问题,并在后续版本中修复它。
- 作为临时解决方案,你可以尝试在同步任务中使用
DELETE
操作符,而不是直接删除数据。这样,FLink CDC应该能够正确处理删除操作。例如:
DELETE FROM your_table WHERE some_condition;
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/591687
问题四:flink如果用es的话只在pom里添加一个es连接器依赖就行吗?
flink如果用es的话只在pom里添加一个es连接器依赖就行吗,放到集群中运行时,用的shade方式打包把他include进去了,但是Flink为啥会报classNotFound 有大佬知道吗?
参考答案:
在Flink中使用Elasticsearch(ES)连接器时,仅仅在pom.xml中添加ES连接器的依赖是不够的。您可能还需要考虑以下几个方面来解决问题:
- 打包方式:当您使用shade方式打包时,需要确保所有需要的依赖都被正确地包含在内。如果您在集群中运行时遇到了
ClassNotFoundException
,这可能是因为某些必要的类没有被包含在打包后的文件中。 - 版本冲突:如果您的集群环境中已经有一个版本的Elasticsearch连接器,而您的项目中使用的是另一个版本,这可能会导致版本冲突。确保您使用的Elasticsearch连接器版本与集群环境兼容是非常重要的。
- 环境配置:确保Elasticsearch服务已经在您的集群中正确安装和配置。如果Elasticsearch服务没有运行,或者配置不正确,这也可能导致
ClassNotFoundException
。 - 依赖管理:在使用PyFlink时,需要特别注意Python依赖的管理。您可能需要根据Elasticsearch的安装版本添加相应的依赖,并且确保这些依赖在集群执行时可用。
- 集群环境:检查集群的lib目录下是否存在与您的应用程序中使用的Jar包版本冲突的情况。如果有冲突,需要解决这些冲突,以避免加载类时的冲突。
- 文档参考:查阅Flink官方文档,了解如何将程序和用于集群执行的库一起打包,以及如何在PyFlink作业中添加JAR包依赖。
综上所述,解决ClassNotFoundException
的问题通常需要综合考虑打包方式、版本兼容性、环境配置、依赖管理等多个方面。建议您仔细检查这些方面,以确保Elasticsearch连接器能够在Flink集群中正常工作。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/591618
问题五:Flink CDC里这个报错有人知道啥原因吗?
Flink CDC里这个报错有人知道啥原因吗?
参考答案:
放错目录了,
关于本问题的更多回答可点击进行查看: