【k8s系列5】KubernetesClientException: too old resource version 原因分析

简介: 【k8s系列5】KubernetesClientException: too old resource version 原因分析

背景

公司目前在基于k8s做调度(基于io.fabric8:kubernetes-client:4.2.0),在运行的过程中,遇到了如下问题:

 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket close received. code: 1000, reason:
 DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Submitting reconnect task to the executor
[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Scheduling reconnect task
[scheduleReconnect|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Current reconnect backoff is 1000 milliseconds (T0)
[reconnectAttempt|Executor for Watch 1880052106] DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - Connecting websocket ... io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@700f518a
DEBUG io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager - WebSocket successfully opened
  WARN PodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 135562761 (135563127)
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:254)[kubernetes-client-4.2.2.jar:?]
at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [okhttp-3.12.0.jar:?]
at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [okhttp-3.12.0.jar:?]
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [okhttp-3.12.0.jar:?]
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [okhttp-3.12.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

单凭这个问题其实没什么,但是代码中是:

  watchConnection = kubernetesClient.pods()
         .withLabel(MERLION_TASK_LABEL, applicationId)
         //      .withResourceVersion(resourceVersion)
         .watch(new TaskPodsWatcher())

因为我们已经注释掉了withResourceVersion(resourceVersion),(如果没有注释掉,说明我们的代码中设置的resourceVersion太小)但是还会报too old resource version


分析


直接跳转到WatchConnectionManager onClosed 如下:

     @Override
      public void onClosed(WebSocket webSocket, int code, String reason) {
        logger.debug("WebSocket close received. code: {}, reason: {}", code, reason);
        if (forceClosed.get()) {
          logger.debug("Ignoring onClose for already closed/closing websocket");
          return;
        }
        if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
          closeEvent(new KubernetesClientException("Connection unexpectedly closed"));
          return;
        }
        scheduleReconnect();
      }

对于onclosed的解释是

 /**
   * Invoked when both peers have indicated that no more messages will be transmitted and the
   * connection has been successfully released. No further calls to this listener will be made.
   */
  public void onClosed(WebSocket webSocket, int code, String reason) {
  }

说明由于长时间没有event的传输,导致该connect被释放了,从而导致WebSocket 被关闭了(这种在任务不是很多的情况下发生的概率很大),从而进行了重联操作scheduleReconnect,而该方法调用了runWatch():

executor.schedule(new NamedRunnable("reconnectAttempt") {
              @Override
              public void execute() {
                try {
                  runWatch();
                  reconnectPending.set(false);
                } catch (Exception e) {
                  // An unexpected error occurred and we didn't even get an onFailure callback.
                  logger.error("Exception in reconnect", e);
                  webSocketRef.set(null);
                  closeEvent(new KubernetesClientException("Unhandled exception in reconnect attempt", e));
                  close();
                }
              }
            }, nextReconnectInterval(), TimeUnit.MILLISECONDS);
          }

而在runWatch()方法中,我们又调用了

 if (this.resourceVersion.get() != null) {
      httpUrlBuilder.addQueryParameter("resourceVersion", this.resourceVersion.get());
    }

而this.resourceVersion 值的设置在 public void onMessage(WebSocket webSocket, String message) 方法中:

WatchEvent event = readWatchEvent(message);
          Object object = event.getObject();
          if (object instanceof HasMetadata) {
            @SuppressWarnings("unchecked")
            T obj = (T) object;
            // Dirty cast - should always be valid though
            resourceVersion.set(((HasMetadata) obj).getMetadata().getResourceVersion());
            Watcher.Action action = Watcher.Action.valueOf(event.getType());
            watcher.eventReceived(action, obj);
          } else if (object instanceof KubernetesResourceList) {
            @SuppressWarnings("unchecked")
            KubernetesResourceList list = (KubernetesResourceList) object;
            // Dirty cast - should always be valid though
            resourceVersion.set(list.getMetadata().getResourceVersion());
            Watcher.Action action = Watcher.Action.valueOf(event.getType());
            List<HasMetadata> items = list.getItems();
            if (items != null) {
              for (HasMetadata item : items) {
                watcher.eventReceived(action, (T) item);
              }
            }

也就是说,假如说如果重联的时候距离上次设置resourceVersion超过了etc保留的最小resourceVersion的话,就会报too old resource version错误:


解决


通过网上查询kubernetes-too-old-resource-version,该Kubernetes Client team memeber 提到了:

Fabric8 does not handle it with plain watch. But it is handling it in SharedInformer API, see ReflectorWatcher. I would recommend using informer API when writing operators since it's better than plain list and watch

也就是说,我们可以用SharedInformer api来实现,而watch机制处理不了这种情况,所以我们可以用SharedInformer实现,截止到2020年11月16日,我们获取到kubernetes-client最新版本,kubernetes-client:4.13.0,编码实现:

val sharedInformerFactory = kubernetesClient.informers()
    val podInformer = sharedInformerFactory
      .sharedIndexInformerFor(classOf[Pod], classOf[PodList],
        new OperationContext().withNamespace("test"), 30 * 1000L)
    podInformer.addEventHandler(new ResourceEventHandler[Pod] {
      override def onAdd(obj: Pod): Unit = {
        eventReceived(obj, "ADD")
      }
      override def onDelete(obj: Pod, deletedFinalStateUnknown: Boolean): Unit = {
        eventReceived(obj, "DELETE")
      }
      override def onUpdate(oldObj: Pod, newObj: Pod): Unit = {
        eventReceived(newObj, "UPDATE")
      }
      private def idShouldUpdate(pod: Pod): Boolean = {
        pod.getMetadata.getLabels.getOrDefault(MERLION_TASK_LABEL, "") == applicationId
      }
      private def eventReceived(pod: Pod, action: String): Unit = {
        if (idShouldUpdate(pod)) {
          val podName = pod.getMetadata.getName
          logger.info(s"Received job pod update for pod named $podName, action ${action}")
          snapshotsStore.updatePod(pod)
        }
      }
    })
    sharedInformerFactory.startAllRegisteredInformers()
 }

其中SharedInformerFactory的机制和k8s Informer机制一样的,能够保证消息的可靠性,

其中最主要的是ReflectorWatcherReflectorDefaultSharedIndexInformer,我们简单的分析一下:

private void processLoop() throws Exception {
    while (true) {
      try {
        this.queue.pop(this.processFunc);
      } catch (InterruptedException t) {
        log.error("DefaultController#processLoop got interrupted {}", t.getMessage(), t);
        return;
      } catch (Exception e) {
        log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e);
        throw e;
      }
    }

而queue也是DeltaFIFO的形参传进来的,也就是说queue就是fifo,而fifo里面的数据从哪里来呢?在controller::run函数中:

 if (fullResyncPeriod > 0) {
          reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, fullResyncPeriod);
        } else {
          reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext, DEFAULT_PERIOD);
        }
reflector.listAndWatch()

将会调用reflector.listAndWatch()方法,该方法进行类似k8s的list-watch机制,如下:

public void listAndWatch() throws Exception {
    try {
      log.info("Started ReflectorRunnable watch for {}", apiTypeClass);
      reListAndSync();
      resyncExecutor.scheduleWithFixedDelay(this::reListAndSync, 0L, resyncPeriodMillis, TimeUnit.MILLISECONDS);
      startWatcher();
    } catch (Exception exception) {
      store.isPopulated(false);
      throw new RejectedExecutionException("Error while starting ReflectorRunnable watch", exception);
    }
  }

reListAndSync进行全量event数据的拉取,startWatcher进行watch,获取增量event数据,那这个watch是什么呢?如下:

 watch.set(
        listerWatcher.watch(new ListOptionsBuilder()
          .withWatch(Boolean.TRUE).withResourceVersion(lastSyncResourceVersion.get()).withTimeoutSeconds(null).build(),
        operationContext.getNamespace(), operationContext, watcher)
      )

这里的watcher在reflector的构造函数中初始化

watcher = new ReflectorWatcher<>(store, lastSyncResourceVersion, this::startWatcher, this::reListAndSync);

而ReflectorWatcher是继承自Watcher,所以也有对应的eventReceived方法和onClose方法,如下:

@Override
  public void eventReceived(Action action, T resource) {
    if (action == null) {
      final String errorMessage = String.format("Unrecognized event %s", resource.getMetadata().getName());
      log.error(errorMessage);
      throw new KubernetesClientException(errorMessage);
    }
    log.info("Event received {}", action.name());
    switch (action) {
      case ERROR:
        final String errorMessage = String.format("ERROR event for %s", resource.getMetadata().getName());
        log.error(errorMessage);
        throw new KubernetesClientException(errorMessage);
      case ADDED:
        store.add(resource);
        break;
      case MODIFIED:
        store.update(resource);
        break;
      case DELETED:
        store.delete(resource);
        break;
    }
    lastSyncResourceVersion.set(resource.getMetadata().getResourceVersion());
    log.debug("{}#Receiving resourceVersion {}", resource.getKind(), lastSyncResourceVersion.get());
  }
  @Override
  public void onClose(KubernetesClientException exception) {
    log.error("Watch closing");
    Optional.ofNullable(exception)
      .map(e -> {
        log.debug("Exception received during watch", e);
        return exception;
      })
      .map(KubernetesClientException::getStatus)
      .map(Status::getCode)
      .filter(c -> c.equals(HttpURLConnection.HTTP_GONE))
      .ifPresent(c -> onHttpGone.run());
    onClose.run();
  }

在eventReceived方法中,所有消息的都会添加到store中也就是fifo的queue中,在onClose方法中,我们看到如果HTTP_GONE,也就是too old resource version的话,会进行onHttpGone.run(),也会进行onClose.run(),而

onHttpGone就是Reflector的reListAndSync函数,onClose是Reflector的startWatcher函数,也就是说一旦该watcher被关闭,就会重新进行watch。


注意


在kubernetes-client:4.6.2中,WatchConnectionManager onMessage 对于HTTP_GONE的处理是不一样的,如下:

if (status.getCode() == HTTP_GONE) {
                logger.info("The resource version {} no longer exists. Scheduling a reconnect.", resourceVersion.get());
                resourceVersion.set(null);
                scheduleReconnect();
              } else {
                logger.error("Error received: {}", status.toString());
              }

一旦发生了HTTP_GONE,会把resourceVersion设置为null,也就是获取最新的event,而且会立即重联,而在4.13.0版本和4.2.0版本,是不会立即重联,而是让用户去处理的。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
Kubernetes 容器 Perl
【Azure K8S】AKS升级 Kubernetes version 失败问题的分析与解决
【Azure K8S】AKS升级 Kubernetes version 失败问题的分析与解决
135 2
|
存储 Kubernetes 监控
在K8S中,Resource Quotas是什么?如何做资源管理的?
在K8S中,Resource Quotas是什么?如何做资源管理的?
|
Kubernetes 监控 调度
K8S中Scheduler原理分析
【6月更文挑战第20天】K8S Scheduler是集群的关键组件,它监听API Server,为新Pod选择合适的Node。
|
Kubernetes 容器 Perl
k8s部署seata 报错 没有提供足够的身份验证信息 [ http-nio-7091-exec-2] [ty.JwtAuthenticationEntryPoint] [ commence] [] : Responding with unauthorized error. Message - Full authentication is required to access this resource
Kubernetes pod 在16:12时出现两次错误,错误信息显示需要完整认证才能访问资源。尽管有此错误,但页面可正常访问。附有yaml配置文件的图片。
906 2
|
Kubernetes 应用服务中间件 nginx
Kubernetes服务网络Ingress网络模型分析、安装和高级用法
Kubernetes服务网络Ingress网络模型分析、安装和高级用法
492 5
|
Kubernetes 监控 调度
Kubernetes Scheduler 原理分析
【2月更文挑战第28天】
|
Kubernetes 容器
k8s集群部署成功后某个节点突然出现notready状态的问题原因分析和解决办法
k8s集群部署成功后某个节点突然出现notready状态的问题原因分析和解决办法
1079 0
|
Kubernetes 调度 容器
二进制 k8s 集群下线 worker 组件流程分析和实践
二进制 k8s 集群下线 worker 组件流程分析和实践
184 0
|
JSON Kubernetes 应用服务中间件
二进制 k8s 集群下线 master 组件流程分析和实践
二进制 k8s 集群下线 master 组件流程分析和实践
197 0
|
Prometheus Kubernetes 监控
Kubernetes APIServer 内存爆满分析
董江,容器技术布道者及实践者,中国移动高级系统架构专家,曾担任华为云核心网技术专家,CloudNative社区核心成员,KubeServiceStack社区发起者,Prometheus社区PMC,Knative Committer,Grafana社区Contributer。 欢迎关注:https://kubeservice.cn/
Kubernetes APIServer 内存爆满分析

热门文章

最新文章

推荐镜像

更多