kubernetes的rolling update机制解析

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: commit: d577db99873cbf04b8e17b78f17ec8f3a27eca30 Date: Fri Apr 10 23:45:36 2015 -0700 ##0.命令行和依赖的基础知识 Synopsis Perform a rolling update of the given ReplicationController.

commit: d577db99873cbf04b8e17b78f17ec8f3a27eca30 Date: Fri Apr 10 23:45:36 2015 -0700

##0.命令行和依赖的基 础知识

Synopsis

Perform a rolling update of the given ReplicationController.

Replaces the specified controller with new controller, updating one pod at a time to use the
new PodTemplate. The new-controller.json must specify the same namespace as the
existing controller and overwrite at least one (common) label in its replicaSelector.


kubectl rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC

Examples

// Update pods of frontend-v1 using new controller data in frontend-v2.json.
$ kubectl rolling-update frontend-v1 -f frontend-v2.json

// Update pods of frontend-v1 using JSON data passed into stdin.
$ cat frontend-v2.json | kubectl rolling-update frontend-v1 -f -

ReplicationController,简称rc,是kubernet体系中某一种类型pod的集合,rc有一个关键参数叫做replicas,也是就是pod的数量。

那么rc有什么用呢?这是为了解决在集群上一堆pod中有些如果挂了,那么就在别的宿主机上把容器启动起来,并让业务流量导入到正确启动的pod上。也就是说,rc保证了集群服务的可用性,当你有很多个服务启动在一个集群中,你需要用程序去监控这些服务的运行状况,并动态保证服务可用。

rc和pod的对应关系是怎么样的?rc通过selector来选择一些pod作为他的控制范围。只要pod的标签(label)符合seletor,则属于这个rc,下面是pod和rc的示例。

xx-controller.json

 "spec":{
 "replicas":1,
 "selector":{
 "name":"redis",
 "role":"master"
 },

xx-pod.json

 "labels": {
 "name": "redis"
 },

kubernetes被我们简称为k8s,如果对其中的基础概念有兴趣可以看这篇

##1.kubctl入口

/cmd/kubectl/kubctl.go

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU())
	cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
	if err := cmd.Execute(); err != nil {
		os.Exit(1)
	}
}

##2.实际调用

源代码在pkg包内,/pkg/kubectl/cmd/cmd.go,每个子命令都实现统一的接口,rollingupdate这行是:

	cmds.AddCommand(NewCmdRollingUpdate(f, out))

这个函数的实现在:/pkg/kubectl/cmd/rollingupdate.go

func NewCmdRollingUpdate(f *cmdutil.Factory, out io.Writer) *cobra.Command {
	cmd := &cobra.Command{
		Use: "rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC",
		// rollingupdate is deprecated.
		Aliases: []string{"rollingupdate"},
		Short: "Perform a rolling update of the given ReplicationController.",
		Long: rollingUpdate_long,
		Example: rollingUpdate_example,
		Run: func(cmd *cobra.Command, args []string) {
			err := RunRollingUpdate(f, out, cmd, args)
			cmdutil.CheckErr(err)
		},
	}
}

可以看到实际调用时的执行函数是RunRollingUpdate,算是进入正题了

func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string) error {
...
	mapper, typer := f.Object()
	// TODO: use resource.Builder instead
	obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()).
		NamespaceParam(cmdNamespace).RequireNamespace().
		FilenameParam(filename).
		Do().
		Object()
	if err != nil {
		return err
	}
	newRc, ok := obj.(*api.ReplicationController)
	if !ok {
		return cmdutil.UsageError(cmd, "%s does not specify a valid ReplicationController", filename)
	}

这是建立一个新的rc的代码,其中resource是kubneter所有资源(pod,service,rc)的基类。可以看到新的rc从json参数文件中获取所有信息,然后转义为ReplicationController这个类。

 if oldName == newName {
		return cmdutil.UsageError(cmd, "%s cannot have the same name as the existing ReplicationController %s",
			filename, oldName)
	}

	var hasLabel bool
	for key, oldValue := range oldRc.Spec.Selector {
		if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue {
			hasLabel = true
			break
		}
	}
	if !hasLabel {
		return cmdutil.UsageError(cmd, "%s must specify a matching key with non-equal value in Selector for %s",
			filename, oldName)
	}

这里可以看到,对于新的rc和旧的rc,有2项限制,一个是新旧名字需要不同,另一个是rc的selector中需要至少有一项的值不一样。

	updater := kubectl.NewRollingUpdater(newRc.Namespace, client)

	// fetch rc
	oldRc, err := client.ReplicationControllers(newRc.Namespace).Get(oldName)
	if err != nil {
		return err
	}
...
	err = updater.Update(out, oldRc, newRc, period, interval, timeout)
	if err != nil {
		return err
	}

在做rolling update的时候,有两个条件限制,一个是新的rc的名字需要和旧的不一样,第二是至少有个一个标签的值不一样。其中namespace是k8s用来做多租户资源隔离的,可以先忽略不计。

##3. 数据结构和实现

这段代码出现了NewRollingUpdater,是在上一层的/pkg/kubectl/rollingupdate.go这个文件中,更加接近主体了

// RollingUpdater provides methods for updating replicated pods in a predictable, // fault-tolerant way.
type RollingUpdater struct { // Client interface for creating and updating controllers
	c client.Interface
	// Namespace for resources
	ns string
}

可以看到这里的RollingUpdater里面是一个k8s的client的结构来向api server发送命令

func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error {
	oldName := oldRc.ObjectMeta.Name
	newName := newRc.ObjectMeta.Name
	retry := &RetryParams{interval, timeout}
	waitForReplicas := &RetryParams{interval, timeout}
	if newRc.Spec.Replicas <= 0 {
		return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec)
	}
	desired := newRc.Spec.Replicas
	sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID)

	// look for existing newRc, incase this update was previously started but interrupted
	rc, existing, err := r.getExistingNewRc(sourceId, newName)
	if existing {
		fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName)
		if err != nil {
			return err
		}
		replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation]
		desired, err = strconv.Atoi(replicas)
		if err != nil {
			return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
				newName, desiredReplicasAnnotation, replicas)
		}
		newRc = rc
	} else {
		fmt.Fprintf(out, "Creating %s\n", newName)
		if newRc.ObjectMeta.Annotations == nil {
			newRc.ObjectMeta.Annotations = map[string]string{}
		}
		newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired)
		newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId
		newRc.Spec.Replicas = 0
		newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc)
		if err != nil {
			return err
		}
	}

	// +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas
	for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 {
		newRc.Spec.Replicas += 1
		oldRc.Spec.Replicas -= 1
		fmt.Printf("At beginning of loop: %s replicas: %d, %s replicas: %d\n",
			oldName, oldRc.Spec.Replicas,
			newName, newRc.Spec.Replicas)
		fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n",
			oldName, oldRc.Spec.Replicas,
			newName, newRc.Spec.Replicas)

		newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)
		if err != nil {
			return err
		}
		time.Sleep(updatePeriod)
		oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)
		if err != nil {
			return err
		}
		fmt.Printf("At end of loop: %s replicas: %d, %s replicas: %d\n",
			oldName, oldRc.Spec.Replicas,
			newName, newRc.Spec.Replicas)
	}
	// delete remaining replicas on oldRc
	if oldRc.Spec.Replicas != 0 {
		fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n",
			oldName, oldRc.Spec.Replicas, 0)
		oldRc.Spec.Replicas = 0
		oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas)
		// oldRc, err = r.resizeAndWait(oldRc, interval, timeout)
		if err != nil {
			return err
		}
	}
	// add remaining replicas on newRc
	if newRc.Spec.Replicas != desired {
		fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n",
			newName, newRc.Spec.Replicas, desired)
		newRc.Spec.Replicas = desired
		newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas)
		if err != nil {
			return err
		}
	}
	// Clean up annotations
	if newRc, err = r.c.ReplicationControllers(r.ns).Get(newName); err != nil {
		return err
	}
	delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation)
	delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation)
	newRc, err = r.updateAndWait(newRc, interval, timeout)
	if err != nil {
		return err
	}
	// delete old rc
	fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName)
	return r.c.ReplicationControllers(r.ns).Delete(oldName)
}

这段代码很长,但做的事情很简单:

  1. 如果新的rc没有被创建,就先创一下,如果已经创建了(在上次的rolling_update中创建了但超时了)
  2. 用几个循环,把新的rc的replicas增加上去,旧的rc的replicas降低下来,主要调用的函数是resizeAndWait和updateAndWait

##4. 底层调用

接上一节的resizeAndWait,代码在/pkg/kubectl/resize.go,这里的具体代码就不贴了 其余的所有调用都发生/pkg/client这个目录下,这是一个http/json的client,主要功能就是向api-server发送请求 整体来说,上面的wait的实现都是比较土的,就是发一个update请求过去,后面轮询的调用get来检测状态是否符合最终需要的状态。

##5. 总结

先说一下这三个时间参数的作用:

update-period:新rc增加一个pod后,等待这个period,然后从旧rc缩减一个pod poll-interval:这个函数名来源于linux上的poll调用,就是每过一个poll-interval,向服务端发起请求,直到这个请求成功或者报失败 timeout:总操作的超时时间

rolling update主要是客户端这边实现的,分析完了,但还是有一些未知的问题,例如:

  1. api-server, cadvisor, kubelet, proxy, etcd这些服务端组件是怎么交互的?怎么保证在服务一直可用的情况下增减pod?
  2. 是否有可能在pod增减的时候插入自己的一些代码或者过程?因为我们目前的架构中没有使用k8s的proxy,需要自己去调用负载均衡的系统给这些pod导流量
  3. 对于具体的pod,我们怎么去做内部程序的健康检查?在业务不可用的情况下向k8s系统发送消息,干掉这个pod,在别的机器上创建新的来替代。
本文转移开源中国- kubernetes的rolling update机制解析
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
4天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
16 2
|
1月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
67 3
|
6天前
|
存储 消息中间件 算法
深入探索操作系统的心脏——内核机制解析
本文旨在揭示操作系统核心——内核的工作原理,通过剖析其关键组件与机制,为读者提供一个清晰的内核结构图景。不同于常规摘要的概述性内容,本文摘要将直接聚焦于内核的核心概念、主要功能以及其在系统管理中扮演的角色,旨在激发读者对操作系统深层次运作原理的兴趣与理解。
|
10天前
|
运维 Kubernetes Cloud Native
Kubernetes云原生架构深度解析与实践指南####
本文深入探讨了Kubernetes作为领先的云原生应用编排平台,其设计理念、核心组件及高级特性。通过剖析Kubernetes的工作原理,结合具体案例分析,为读者呈现如何在实际项目中高效部署、管理和扩展容器化应用的策略与技巧。文章还涵盖了服务发现、负载均衡、配置管理、自动化伸缩等关键议题,旨在帮助开发者和运维人员掌握利用Kubernetes构建健壮、可伸缩的云原生生态系统的能力。 ####
|
18天前
|
存储 缓存 安全
🌟Java零基础:深入解析Java序列化机制
【10月更文挑战第20天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
22 3
|
23天前
|
Java 开发者 UED
Java编程中的异常处理机制解析
在Java的世界里,异常处理是确保程序稳定性和可靠性的关键。本文将深入探讨Java的异常处理机制,包括异常的类型、如何捕获和处理异常以及自定义异常的创建和使用。通过理解这些概念,开发者可以编写更加健壮和易于维护的代码。
|
8天前
|
存储 Kubernetes 调度
深度解析Kubernetes中的Pod生命周期管理
深度解析Kubernetes中的Pod生命周期管理
中断处理机制解析
【10月更文挑战第5天】中断处理需定义中断处理函数`irq_handler_t`,参数包括中断信号`irq`和通用指针`dev_id`。返回值`IRQ_NONE`表示非本设备中断,`IRQ_HANDLED`表示已处理,`IRQ_WAKE_THREAD`表示需唤醒等待进程。处理程序常分上下半部,关键部分在中断处理函数中完成,延迟部分通过工作队列处理。注册中断处理函数需调用`request_irq`,参数包括中断信号、处理函数、标志位、设备名和通用指针。
|
25天前
|
存储 Kubernetes 监控
深度解析Kubernetes在微服务架构中的应用与优化
【10月更文挑战第18天】深度解析Kubernetes在微服务架构中的应用与优化
95 0
|
30天前
|
JavaScript 前端开发 开发者
原型链深入解析:JavaScript中的核心机制
【10月更文挑战第13天】原型链深入解析:JavaScript中的核心机制
28 0

推荐镜像

更多