开发者社区> 问答> 正文

Flink 从Checkpoint恢复失败

冷丰 2019-01-02 22:10:41 1970
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

case class TestClass2(name: String, money: Int, var count: Long, var flage: Boolean)

object CheckpointExample extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.enableCheckpointing(10000)
  val checkpointConf = env.getCheckpointConfig
  checkpointConf.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 应用cancel掉时候,Checkpoint不会被cleanup掉.


  /*  kafka init  */
  val orderProperties = new Properties()
  //orderProperties.setProperty("bootstrap.servers", "172.24.80.110:9092")
  orderProperties.setProperty("bootstrap.servers", "192.168.1.160:19092")
  orderProperties.setProperty("group.id", "id1")
  val orderKafkaConsumer = new FlinkKafkaConsumer011[String]("test_topic_h1", new SimpleStringSchema(), orderProperties)

  val source = env.addSource(orderKafkaConsumer).uid("source")
  val t1 = source.map(_.split(",")).map(x => TestClass2(x(0), x(1).toInt, x(2).toLong, false)).uid("map")
  val t4 = t1.keyBy(_.name).sum("money")
  t4.print()

  env.execute("10086")
}

代码如上, 在kafka 输入
hb,40,0
hb,40,0
结果正确 3> TestClass2(hb,80,0,false)
但是 从Checkpoint恢复时,作业恢复不成功,
恢复命令 $FLINK_HOME/bin/flink run -s hdfs://hadoop:8020/flink/flink-checkpoints/b01fbd688ab62cb20e5fe75ca1b6fba0/chk-20/_metadata --class ${MAIN_CLASS} ${LIB_DIR}/${LIB_NAME}, 变量值是正确的.

报错:

2019-01-02 21:53:39,859 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to airflow (auth:SIMPLE)
2019-01-02 21:53:39,911 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2019-01-02 21:53:39,917 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2019-01-02 21:53:40,392 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'
2019-01-02 21:53:41,384 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating highly available BLOB storage directory at hdfs://qa-bigdata001.ecs.east1-e:8020/flink/recovery//default/blob
2019-01-02 21:53:41,525 INFO  org.apache.flink.runtime.util.ZooKeeperUtils                  - Enforcing default ACL for ZK connections
2019-01-02 21:53:41,526 INFO  org.apache.flink.runtime.util.ZooKeeperUtils                  - Using '/flink/default' as Zookeeper namespace.
2019-01-02 21:53:41,608 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Starting
2019-01-02 21:53:41,617 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:host.name=qa-bigdata004.ecs.east1-e
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.8.0_77
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle Corporation
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.home=/usr/java/jdk1.8.0_77/jre
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.class.path=/home/airflow/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/home/airflow/flink-1.5.0/lib/flink-shaded
-hadoop2-uber-1.5.0.jar:/home/airflow/flink-1.5.0/lib/log4j-1.2.17.jar:/home/airflow/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/home/airflow/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar::/etc/hadoop/conf:
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.io.tmpdir=/tmp
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:java.compiler=<NA>
2019-01-02 21:53:41,618 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.name=Linux
2019-01-02 21:53:41,619 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.arch=amd64
2019-01-02 21:53:41,619 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:os.version=2.6.32-696.1.1.el6.x86_64
2019-01-02 21:53:41,619 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.name=airflow
2019-01-02 21:53:41,619 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.home=/home/airflow
2019-01-02 21:53:41,619 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Client environment:user.dir=/home/airflow/streaming-project/flink/flink-example
2019-01-02 21:53:41,620 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=172.24.80.87:2181,172.24.80.88:2181,172.24.80.89:2181, sessionTimeout=60000 watcher=org.apa
che.flink.shaded.curator.org.apache.curator.ConnectionState@5e01a982
2019-01-02 21:53:41,636 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specifi
ed JAAS configuration file: '/tmp/jaas-2931078451485661056.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
2019-01-02 21:53:41,639 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 172.24.80.88/172.24.80.88:2181
2019-01-02 21:53:41,639 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed
2019-01-02 21:53:41,640 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to 172.24.80.88/172.24.80.88:2181, initiating session
2019-01-02 21:53:41,649 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 172.24.80.88/172.24.80.88:2181, sessionid = 0x2653fad5ca288a9, negotiated timeout = 40000
2019-01-02 21:53:41,651 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
2019-01-02 21:53:41,935 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2019-01-02 21:53:41,939 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-01-02 21:53:41,967 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-01-02 21:53:41,967 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2019-01-02 21:53:41,974 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2019-01-02 21:53:42,398 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 64e1046a98a0be642d99e9e0a16bfb66 (detached: false).
2019-01-02 21:53:42,399 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Requesting blob server port.
2019-01-02 21:54:12,443 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2019-01-02 21:54:12,449 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2019-01-02 21:54:12,449 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-01-02 21:54:12,451 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-01-02 21:54:12,452 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - backgroundOperationsLoop exiting
2019-01-02 21:54:12,455 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 0x2653fad5ca288a9 closed
2019-01-02 21:54:12,455 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - EventThread shut down for session: 0x2653fad5ca288a9
2019-01-02 21:54:12,456 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:258)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
        at com.dianwoda.bigdata.flink.example.CheckpointExample$.delayedEndpoint$com$dianwoda$bigdata$flink$example$CheckpointExample$1(CheckpointExample.scala:65)
        at com.dianwoda.bigdata.flink.example.CheckpointExample$delayedInit$body.apply(CheckpointExample.scala:38)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App
$$
anonfun$main$1.apply(App.scala:76)
        at scala.App
$$
anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.dianwoda.bigdata.flink.example.CheckpointExample$.main(CheckpointExample.scala:38)
        at com.dianwoda.bigdata.flink.example.CheckpointExample.main(CheckpointExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$5(RestClusterClient.java:357)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:834)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1088)
        at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
        at java.util.concurrent.CompletableFuture$CoCompletion.tryFire(CompletableFuture.java:1020)
        ... 10 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        ... 10 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        ... 10 more
Caused by: java.util.concurrent.TimeoutException
        ... 8 more

求解

消息中间件 分布式计算 Oracle Hadoop Java 关系型数据库 Kafka Linux 流计算 网络架构
分享到
取消 提交回答
全部回答(1)
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题