问题一:Flink CDC 使用jar包启动异常
IDEA可以正常运行,使用jar包启动后,就报下面的错误
2024-02-28 16:38:09.568 WARN 59419 --- [lt-dispatcher-4] .a.f.r.s.t.DefaultDelegationTokenManager : No tokens obtained so skipping notifications 2024-02-28 16:38:09.568 WARN 59419 --- [lt-dispatcher-4] .a.f.r.s.t.DefaultDelegationTokenManager : Tokens update task not started because either no tokens obtained or none of the tokens specified its renewal date 2024-02-28 16:38:09.569 INFO 59419 --- [ter-io-thread-4] o.a.f.r.h.n.e.EmbeddedLeaderService : Received confirmation of leadership for leader pekko://flink/user/rpc/resourcemanager_1 , session=5f111541-15bc-4d44-aab7-f393c1ffdcaf 2024-02-28 16:38:09.571 INFO 59419 --- [lt-dispatcher-4] o.a.f.runtime.taskexecutor.TaskExecutor : Connecting to ResourceManager pekko://flink/user/rpc/resourcemanager_1(aab7f393c1ffdcaf5f11154115bc4d44). 2024-02-28 16:38:09.571 INFO 59419 --- [ter-io-thread-1] o.a.f.r.h.n.e.EmbeddedLeaderService : Received confirmation of leadership for leader pekko://flink/user/rpc/dispatcher_2 , session=b7c356f2-b9ce-4254-90a1-0d13518c6883 2024-02-28 16:38:09.601 INFO 59419 --- [lt-dispatcher-8] o.a.f.runtime.taskexecutor.TaskExecutor : Resolved ResourceManager address, beginning registration 2024-02-28 16:38:09.608 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.r.StandaloneResourceManager : Registering TaskManager with ResourceID 9acc0233-57a4-4d8b-8c97-0e17008e0dca (pekko://flink/user/rpc/taskmanager_0) at ResourceManager 2024-02-28 16:38:09.610 INFO 59419 --- [lt-dispatcher-7] o.a.f.r.dispatcher.StandaloneDispatcher : Received JobGraph submission 'evp job' (bbc2959a1228d4782ade6888026503f1). 2024-02-28 16:38:09.610 INFO 59419 --- [lt-dispatcher-6] o.a.f.runtime.taskexecutor.TaskExecutor : Successful registration at resource manager pekko://flink/user/rpc/resourcemanager_1 under registration id 2e75d4e948fb7413bdd8ddaa65535673. 2024-02-28 16:38:09.611 INFO 59419 --- [lt-dispatcher-7] o.a.f.r.dispatcher.StandaloneDispatcher : Submitting job 'evp job' (bbc2959a1228d4782ade6888026503f1). 2024-02-28 16:38:09.612 INFO 59419 --- [lt-dispatcher-4] o.a.f.r.r.s.FineGrainedSlotManager : Registering task executor 9acc0233-57a4-4d8b-8c97-0e17008e0dca under 2e75d4e948fb7413bdd8ddaa65535673 at the slot manager. 2024-02-28 16:38:09.627 INFO 59419 --- [lt-dispatcher-7] o.a.f.r.h.n.e.EmbeddedLeaderService : Proposing leadership to the contender that is registered under component ID 'job-bbc2959a1228d4782ade6888026503f1'. 2024-02-28 16:38:09.628 INFO 59419 --- [ter-io-thread-2] a.f.r.j.JobMasterServiceLeadershipRunner : JobMasterServiceLeadershipRunner for job bbc2959a1228d4782ade6888026503f1 was granted leadership with leader id 624ca6ad-7a55-43ed-9668-28c7ce10c3b2. Creating new JobMasterServiceProcess. 2024-02-28 16:38:09.641 INFO 59419 --- [ger-io-thread-1] o.a.f.runtime.rpc.pekko.PekkoRpcService : Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at pekko://flink/user/rpc/jobmanager_3 . 2024-02-28 16:38:09.649 INFO 59419 --- [ger-io-thread-1] o.a.flink.runtime.jobmaster.JobMaster : Initializing job 'evp job' (bbc2959a1228d4782ade6888026503f1). 2024-02-28 16:38:09.691 INFO 59419 --- [lt-dispatcher-7] o.a.f.r.dispatcher.StandaloneDispatcher : Job bbc2959a1228d4782ade6888026503f1 reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:842) Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) ... 3 more Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ... 3 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:467) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2045) at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1909) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2235) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1744) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:514) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:101) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ... 4 more 2024-02-28 16:38:09.695 INFO 59419 --- [ter-io-thread-3] o.a.f.r.dispatcher.StandaloneDispatcher : Job bbc2959a1228d4782ade6888026503f1 has been registered for cleanup in the JobResultStore after reaching a terminal state. 2024-02-28 16:38:09.796 INFO 59419 --- [onPool-worker-2] o.a.f.runtime.minicluster.MiniCluster : Shutting down Flink Mini Cluster 2024-02-28 16:38:09.801 INFO 59419 --- [lt-dispatcher-6] o.a.f.runtime.taskexecutor.TaskExecutor : Stopping TaskExecutor pekko://flink/user/rpc/taskmanager_0. 2024-02-28 16:38:09.802 INFO 59419 --- [lt-dispatcher-6] o.a.f.runtime.taskexecutor.TaskExecutor : Close ResourceManager connection 7f55f62128eada4a14d21fb554184f7a. 2024-02-28 16:38:09.803 INFO 59419 --- [lt-dispatcher-7] o.a.f.r.r.StandaloneResourceManager : Closing TaskExecutor connection 9acc0233-57a4-4d8b-8c97-0e17008e0dca because: The TaskExecutor is shutting down. 2024-02-28 16:38:09.803 INFO 59419 --- [lt-dispatcher-7] o.a.f.r.r.s.FineGrainedSlotManager : Unregistering task executor 2e75d4e948fb7413bdd8ddaa65535673 from the slot manager. 2024-02-28 16:38:09.804 INFO 59419 --- [lt-dispatcher-6] askExecutorStateChangelogStoragesManager : Shutting down TaskExecutorStateChangelogStoragesManager. 2024-02-28 16:38:09.804 INFO 59419 --- [lt-dispatcher-6] ecutorChannelStateExecutorFactoryManager : Shutting down TaskExecutorChannelStateExecutorFactoryManager. 2024-02-28 16:38:09.807 INFO 59419 --- [onPool-worker-2] o.a.f.r.d.DispatcherRestEndpoint : Shutting down rest endpoint. 2024-02-28 16:38:09.813 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.t.DefaultJobLeaderService : Stop job leader service. 2024-02-28 16:38:09.814 INFO 59419 --- [lt-dispatcher-6] .r.s.TaskExecutorLocalStateStoresManager : Shutting down TaskExecutorLocalStateStoresManager. 2024-02-28 16:38:09.818 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.io.disk.FileChannelManagerImpl : FileChannelManager removed spill file directory /home/evuser3/tmp/flink-io-c0cdff82-d2b4-4706-80db-dd401f5c90e3 2024-02-28 16:38:09.819 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.i.n.NettyShuffleEnvironment : Shutting down the network environment and its components. 2024-02-28 16:38:09.823 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.io.disk.FileChannelManagerImpl : FileChannelManager removed spill file directory /home/evuser3/tmp/flink-netty-shuffle-729f4439-d93a-4e70-bbf7-742fdd082feb 2024-02-28 16:38:09.823 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.taskexecutor.KvStateService : Shutting down the kvState service and its components. 2024-02-28 16:38:09.823 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.t.DefaultJobLeaderService : Stop job leader service. 2024-02-28 16:38:09.824 INFO 59419 --- [lt-dispatcher-6] o.a.flink.runtime.filecache.FileCache : removed file cache directory /home/evuser3/tmp/flink-dist-cache-101fc1d5-3b33-4859-b815-78e0f97146c7 2024-02-28 16:38:09.826 INFO 59419 --- [lt-dispatcher-6] o.a.f.runtime.taskexecutor.TaskExecutor : Stopped TaskExecutor pekko://flink/user/rpc/taskmanager_0. 2024-02-28 16:38:09.828 INFO 59419 --- [onPool-worker-7] o.a.f.r.d.DispatcherRestEndpoint : Removing cache directory /home/evuser3/tmp/flink-web-ui 2024-02-28 16:38:09.829 INFO 59419 --- [onPool-worker-7] o.a.f.r.d.DispatcherRestEndpoint : http://localhost:34096 lost leadership 2024-02-28 16:38:09.830 INFO 59419 --- [onPool-worker-7] o.a.f.r.d.DispatcherRestEndpoint : Shut down complete. 2024-02-28 16:38:09.831 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.r.StandaloneResourceManager : Shut down cluster because application is in CANCELED, diagnostics DispatcherResourceManagerComponent has been closed.. 2024-02-28 16:38:09.832 INFO 59419 --- [lt-dispatcher-7] r.e.c.DispatcherResourceManagerComponent : Closing components. 2024-02-28 16:38:09.833 INFO 59419 --- [lt-dispatcher-7] a.f.r.d.r.SessionDispatcherLeaderProcess : Stopping SessionDispatcherLeaderProcess. org.apache.flink.util.FlinkException: Failed to execute job 'evp job'. 2024-02-28 16:38:09.833 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.dispatcher.StandaloneDispatcher : Stopping dispatcher pekko://flink/user/rpc/dispatcher_2. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2253) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2125) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68) 2024-02-28 16:38:09.834 INFO 59419 --- [lt-dispatcher-6] o.a.f.r.dispatcher.StandaloneDispatcher : Stopping all currently running jobs of dispatcher pekko://flink/user/rpc/dispatcher_2. 2024-02-28 16:38:09.834 INFO 59419 --- [lt-dispatcher-7] o.a.f.r.r.ResourceManagerServiceImpl : Stopping resource manager service. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099) at com.modi.evp.biz.job.EvpJob.mainJob(EvpJob.java:40)
参考答案:
这个问题可能是由于在打包时,某些依赖没有被正确地包含在jar包中。你可以尝试以下方法来解决这个问题:
- 确保你的项目中包含了所有必要的依赖。你可以在
pom.xml
文件中检查这些依赖,确保它们都被正确地声明和引用。 - 使用Maven的
maven-assembly-plugin
插件来创建一个包含所有依赖的可执行jar文件。在你的pom.xml
文件中添加以下配置:
<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>你的主类全名,例如:com.example.Main</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
- 使用命令行运行以下命令来构建项目并生成可执行jar文件:
mvn clean package
- 运行生成的jar文件:
java -jar target/your-project-name-version-jar-with-dependencies.jar
这样应该可以解决你的问题。如果问题仍然存在,请检查日志以获取更多详细信息,并确保你的环境配置正确。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/600154?spm=a2c6h.12873639.article-detail.7.50e24378TRW91E
问题二:flink cdc同步数据到holo里的问题?
从rds中通过flink cdc同步数据到holo里,
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS holo里的数据库名.要新建的表名
AS TABLE 源端数据库类型.数据库名.源端表名 ;
END;
已经预先建了catalog 按上面这么写 报错
参考答案:
使用Flink CDC(Change Data Capture,变更数据捕获)同步数据到Holo(一种数据存储服务)时可能会遇到的问题及其解决方法如下:
- 删除操作的同步:当源数据库中的记录被删除时,Flink CDC能够通过监听binlog或其他机制捕获这一变更,并将其转换为对应的Elasticsearch操作。这意味着如果源数据库中的
product
表记录被删除,Flink CDC会生成相应的Elasticsearch Delete操作,确保同步到Holo的数据保持一致性。 - 连接池大小调整:如果在同步过程中遇到性能瓶颈,可以考虑调整每个表的
connection.pool.size
参数。增大该值可以提高并发处理能力,从而可能提升同步效率。 - 分词器的创建:在将数据同步到Elasticsearch时,可以在创建索引的时候通过
createIndexRequest()
方法指定分词器。这样,即使在同步过程中也能正确处理文本字段的分词需求。 - 全量与增量数据读取:如果在使用MySQL CDC时发现只读取了全量数据而没有读取增量数据,可能是因为某些配置或环境问题导致的。需要检查作业配置和运行状态,确保CDC作业能够正常捕获和同步增量数据。
- 反压问题:在全表读取阶段如果效率慢或者出现反压现象,可能需要优化作业配置,比如增加并行度、调整缓冲区大小等,以提高数据处理效率。
- 作业失败处理:如果Flink CDC作业失败,根据配置,它可能会彻底退出或尝试重启。需要根据具体场景和需求来配置作业的故障恢复策略。
- 窗口函数支持:由于MySQL/Hologres CDC源表不支持窗口函数,如果需要实现类似每分钟聚合统计的需求,可能需要在Flink应用中采用其他方式来实现,例如使用ProcessFunction。
- 源表作为Source的限制:通常情况下,CDC表只能作为Source使用,如果需要将其作为Sink或其他用途,可能需要额外的设计和配置。
- 正则表达式解析:如果在使用table-name正则表达式时遇到无法解析逗号的问题,可能需要调整正则表达式或使用其他匹配模式来解决问题。
- 作业重启位置:在作业重启时,MySQL CDC源表通常会从作业停止时的位置继续消费数据,而不是从头开始。这是为了确保数据的连续性和一致性。如果遇到不符合预期的情况,需要检查配置和日志以确定原因。
综上所述,在使用Flink CDC同步数据到Holo的过程中,可能会遇到多种挑战,但通过合理的配置和策略调整,这些问题通常都可以得到解决。同时,了解Flink CDC的原理和工作机制有助于更好地应对这些挑战。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/599604?spm=a2c6h.12873639.article-detail.8.50e24378TRW91E
问题三:有没有什么办法通过一个 flink CDC同步来实现?
我们现在是有三个DRDS, 然后每个DRDS下面又各自挂有一定数量的RDS,
是要做三个Flink CDC同步嘛?
还是说 有没有什么办法通过一个 flink CDC同步来实现
参考答案:
DRDS还不支持
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/599598?spm=a2c6h.12873639.article-detail.9.50e24378TRW91E
问题四:Flink CDC里我用的cdc2. 4的版本,为什么在运行程序时会重复读取oracle 中的数据?
Flink CDC里我用的cdc2. 4的版本,为什么在运行程序时会重复读取oracle 中的数据?并且重复时间和我的ck 配置时间有关系,这个问题应该怎么解决?
参考答案:
在Flink CDC中使用cdc2.4版本时,如果遇到重复读取Oracle中的数据的问题,这可能是由于几个原因造成的。以下是可能的原因和相应的解决方案:
- 检查Oracle数据库配置:确保您的Oracle数据库已经启用了日志归档,并且被捕获的表或数据库上已经启用了增量日志记录。这是实现实时数据捕获的基本要求。
- 检查CDC配置:确认您的Flink CDC配置是否正确,特别是与checkpoint相关的配置。错误的checkpoint配置可能会导致数据的重复读取。
- 检查Flink作业状态:如果Flink作业存在反压(backpressure),CDC Source中的Binlog Client可能会因为反压而无法继续读取数据,这可能导致重复读取问题。解决方案是优化Flink作业以减少反压情况的发生。
- 检查数据库连接超时设置:如果数据库上配置的连接超时时间过短,可能会导致Binlog Client连接被切断,从而影响数据的正常读取。您可以尝试增加数据库的连接超时时间来解决这个问题。
- 版本兼容性:虽然Flink CDC 2.1版本开始增加了对Oracle的支持,但是如果您使用的是更早的版本或者Oracle数据库的版本较旧,可能会存在兼容性问题。请确保您使用的Flink CDC版本与Oracle数据库版本兼容。
- 监控和日志分析:通过监控Flink作业的运行状态和查看日志信息,可以帮助您更准确地定位问题所在。
- 咨询社区和支持:如果上述方法都无法解决问题,建议在Flink社区或者寻求官方支持,提供详细的问题描述和相关配置信息,以便得到更专业的帮助。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/599301?spm=a2c6h.12873639.article-detail.10.50e24378TRW91E
问题五:Flink CDC里这个一直是100%是什么原因?
Flink CDC里这个一直是100%是什么原因? source我没动,只是改了composer的这一点逻辑 mysql to doris的pipeline运行正常的,但是这个mysql to hudi的pipeline我这样改了一下,第一步就busy 100%了,source我没动
参考答案:
应该是SchemaOperator算子阻塞了吧。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/599299?spm=a2c6h.12873639.article-detail.11.50e24378TRW91E