开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请教一下 flink cdc oracle 报错

maven

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-oracle-cdc</artifactId>
            <version>${flinkcdc.vsersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.13.6</version>
        </dependency>
         <dependency>
             <groupId>com.oracle.database.jdbc</groupId>
             <artifactId>ojdbc10</artifactId>
             <version>19.10.0.0</version>
         </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.vsersion}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

代码
```public static void main(String[] args) throws Exception {
//反序列化定义
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("log.mining.strategy","online_catalog");
//Oracle数据源定义(ip、username、password、、、、、)
JdbcIncrementalSource oracleChangeEventSource = new OracleSourceBuilder()
.hostname("192.168.1.253")
.port(1521)
.databaseList("newdb")
.schemaList("payment")
.tableList("payment.order")
.username("payment")
.password("payment")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true)
.startupOptions(StartupOptions.initial())
.debeziumProperties(debeziumProperties)
.splitSize(2)
.build();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(3000L);
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
environment.fromSource(oracleChangeEventSource,
WatermarkStrategy.noWatermarks(),
"OracleParallelSource")
.print()
.setParallelism(1);
environment.execute("Print Oracle Snapshot + RedoLog");
}



maven打包后,在webUI提交jar,并submit,执行失败,

![image.png](https://ucc.alicdn.com/pic/developer-ecology/m666eo5xcszli_2622060522c744c9b7b655e0ac915da7.png)


失败日志
```2023-10-24 20:07:25
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=2000)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:102)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:301)
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:618)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: OracleParallelSource -> Sink: Print to Std. Out' (operator cbc357ccb763df2852fee8c4fc7d55f2).
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600)
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:226)
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:405)
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:150)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator
    at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:151)
    at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:222)
    ... 8 more
Caused by: java.lang.RuntimeException: Failed to resolve Oracle database version
    at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:177)
    at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:87)
    at io.debezium.connector.oracle.OracleConnection.<init>(OracleConnection.java:82)
    at com.ververica.cdc.connectors.oracle.source.utils.OracleConnectionUtils.createOracleConnection(OracleConnectionUtils.java:61)
    at com.ververica.cdc.connectors.oracle.source.OracleDialect.openJdbcConnection(OracleDialect.java:90)
    at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:107)
    at com.ververica.cdc.connectors.oracle.source.OracleDialect.discoverDataCollections(OracleDialect.java:51)
    at com.ververica.cdc.connectors.base.source.IncrementalSource.createEnumerator(IncrementalSource.java:139)
    ... 9 more
Caused by: java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@192.168.1.253:1521/newecpss
    at java.sql.DriverManager.getConnection(DriverManager.java:689)
    at java.sql.DriverManager.getConnection(DriverManager.java:208)
    at io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$0(JdbcConnection.java:192)
    at io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:129)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883)
    at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:636)
    at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510)
    at io.debezium.connector.oracle.OracleConnection.resolveOracleDatabaseVersion(OracleConnection.java:147)
    ... 16 more

展开
收起
songweixing 2023-10-24 20:26:34 223 0
3 条回答
写回答
取消 提交回答
  • 看起来像是Flink执行失败了,原因是出现了全局失败。具体原因是"Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=2000)",表示恢复被固定延迟重试策略抑制,最大重试次数为3次,每次重试之间的间隔为2000毫秒。这可能是由于某些作业在执行过程中出现了严重的问题,导致无法恢复。建议你可以检查一下你的Flink配置文件,看看是否有错误的地方。另外,你也可以检查一下你的作业逻辑,看看是否有潜在的问题。

    2023-10-25 16:08:56
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    从日志信息中可以看出,Job执行失败的原因是由于执行失败后采用了固定延迟重试策略,并且已经达到了最大重试次数,因此恢复被抑制。建议检查您的代码逻辑,确保源数据的可用性,以及设置合理的重试策略。

    2023-10-25 14:02:57
    赞同 展开评论 打赏
  • 从你提供的依赖来看,你已经包含了所有必要的依赖。Flink CDC Oracle的错误可能由多种原因引起,包括Oracle版本不匹配、Oracle配置问题、Flink配置问题等。

    以下是一些可能的解决方案:

    1. 检查Oracle的版本是否与Flink CDC Oracle的版本相匹配。例如,如果你的Oracle是12c,那么你需要使用Flink CDC Oracle 12c的版本。

    2. 检查Oracle的配置,确保CDC功能已经启用。

    3. 检查Flink的配置,确保Flink正确连接到Oracle,并且Flink的并行度设置得当。

    4. 检查Flink的任务管理器和工作节点的资源分配,确保它们有足够的资源来处理CDC数据。

    5. 如果问题仍然存在,你可能需要查看Flink的日志,以获取更多的错误信息。

    2023-10-25 10:39:27
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    PostgresChina2018_樊文凯_ORACLE数据库和应用异构迁移最佳实践 立即下载
    PostgresChina2018_王帅_从Oracle到PostgreSQL的数据迁移 立即下载
    Oracle云上最佳实践 立即下载

    相关镜像