开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC 实时同步,任务挂掉了,启动才能保证不丢数据吗

Flink CDC 实时同步,任务挂掉了,再次启动的话是必须从checkpoint启动才能保证不丢数据吗 如果checkpoint丢失咋办?2023-11-13 03:33:09,843 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'flink-scheduler-1' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Java heap space
at akka.dispatch.MessageDispatcher.unbatchedExecute(AbstractDispatcher.scala:163) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:133) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:124) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.dispatch.MessageDispatcher.execute(AbstractDispatcher.scala:96) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:365) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:314) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:318) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_382]
2023-11-13 03:33:09,844 ERROR org.apache.flink.runtime.rpc.akka.RobustActorSystem$1 [] - exception on LARS’ timer thread
java.lang.OutOfMemoryError: Java heap space
at akka.dispatch.MessageDispatcher.unbatchedExecute(AbstractDispatcher.scala:163) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:133) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:124) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.dispatch.MessageDispatcher.execute(AbstractDispatcher.scala:96) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:365) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:314) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:318) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:270) ~[flink-rpc-akka_a6abeac8-0973-4049-bcd5-0127ad161621.jar:1.16.2]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
2023-11-13 03:33:09,844 INFO org.apache.flink.runtime.rpc.akka.RobustActorSystem$1 [] - starting new LARS thread两个并行度就oom了

展开
收起
真的很搞笑 2023-11-21 09:16:52 115 0
3 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    根据您提供的日志信息,似乎是因为Flink任务无法分配足够的内存导致OOM(Out Of Memory)异常,从而导致Flink任务崩溃。
    您可以通过调整任务的内存大小来解决这个问题。在Flink任务启动时,您可以设置JVM内存大小,并增加堆栈大小以增加可用的内存。例如:

    ./bin/flink run -m yarn-cluster --jars $FLINK_HOME/lib/* -Dtaskmanager.memory.process.size=4000m -Dtaskmanager.heap.size=2048m -ynm myjob ./examples/streaming/MyJob.jar
    

    上述示例将把TaskManager的内存大小设为4GB,并将堆栈大小设为2GB。
    除了增加内存之外,您还可以考虑其他方法来缓解问题,如:

    • 减少任务的并发度:减少任务的并发度可以减轻对内存的压力。
    • 减小并行度:减小任务的并行度可以减少内存的需求,并且可以改善总体性能。
    2023-11-21 23:12:43
    赞同 1 展开评论 打赏
  • 如果你的Flink任务因为OOM(OutOfMemoryError)错误而失败,那么你需要首先解决内存溢出的问题。这可能是因为你为任务分配的内存不足,或者是因为你的任务产生了大量的垃圾对象导致内存耗尽。

    关于如何解决OOM错误,你可以参考这篇文章:https://www.jianshu.com/p/68e15d856f0

    至于你提到的Checkpoint丢失的问题,如果你希望在任务失败后从最近的Checkpoint恢复,你需要确保你的任务配置了Checkpoint,并且Checkpoint是成功的。如果Checkpoint成功,它们会被持久化到存储系统中,这样在任务失败后可以从最近的Checkpoint恢复。

    如果你的Checkpoint丢失了,你可以尝试重新生成Checkpoint。但是请注意,这可能会导致数据丢失,因为你只能恢复到Checkpoint保存的状态。

    2023-11-21 14:58:49
    赞同 展开评论 打赏
  • 这个错误提示是由于Java堆内存溢出导致的。Java堆内存用于存放对象实例和数组,当程序试图创建一个新对象或数组时,如果没有足够的内存分配给它们,就会抛出OutOfMemoryError异常。要解决这个问题,有以下几种方法:

    1. 增加Java虚拟机(JVM)的最大堆内存大小。可以通过设置-Xms-Xmx参数来控制JVM初始和最大堆内存大小。例如,如果您想把最大堆内存设置为1GB,可以使用如下命令:
    -Xms1g -Xmx1g
    
    1. 调整Flink的任务并发度。任务并发度是指同时执行的任务数。降低任务并发度可以帮助减少Java堆内存的需求,从而避免内存溢出的问题。
    2. 使用更高效的算法或数据结构。优化代码逻辑或采用更高效的算法也可以帮助减少内存使用。
    3. 分配更多的物理内存给Flink。如果您的机器有足够的空闲内存,可以考虑增加Flink使用的内存量。
    2023-11-21 14:55:30
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载