kubectl exec 在kubelet中的处理流程

简介: kubectl exec 在kubelet中的处理流程

获取streaming URL

kubelet在启动时会初始化一个serve,注册好对应的handler, exec的handler如下:

 
    ws = new(restful.WebService)
    ws.
        Path("/exec")
    ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
        To(s.getExec).
        Operation("getExec"))
    ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
        To(s.getExec).
        Operation("getExec"))
    ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
        To(s.getExec).
        Operation("getExec"))
    ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
        To(s.getExec).
        Operation("getExec"))
    s.restfulCont.Add(ws)

所有的路径最后都由getExec来进行处理:

// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
    params := getExecRequestParams(request)
    streamOpts, err := remotecommandserver.NewOptions(request.Request)
    if err != nil {
        utilruntime.HandleError(err)
        response.WriteError(http.StatusBadRequest, err)
        return
    }
    pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
    if !ok {
        response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
        return
    }
 
    podFullName := kubecontainer.GetPodFullName(pod)
    url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
    if err != nil {
        streaming.WriteError(err, response.ResponseWriter)
        return
    }
    if s.redirectContainerStreaming {
        http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
        return
    }
    proxyStream(response.ResponseWriter, request.Request, url)
}

以上代码可以看出,首先调用host.GetExec获取URL,然后判断是否开启重定向,如果开启则进行重定向,否则直接代理请求到该streaming URL。这里host对象对应的实现其实就是kubelet, 我们看下GetExec的实现:

// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
    container, err := kl.findContainer(podFullName, podUID, containerName)
    if err != nil {
        return nil, err
    }
    if container == nil {
        return nil, fmt.Errorf("container not found (%q)", containerName)
    }
    return kl.streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
}

这里只是继续调用streamingRuntime的GetExec方法,streamingRuntime是个interface,具体的实现是kubeGenericRuntimeManager

// GetExec gets the endpoint the runtime will serve the exec request from.
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
    req := &runtimeapi.ExecRequest{
        ContainerId: id.ID,
        Cmd:         cmd,
        Tty:         tty,
        Stdin:       stdin,
        Stdout:      stdout,
        Stderr:      stderr,
    }
    resp, err := m.runtimeService.Exec(req)
    if err != nil {
        return nil, err
    }
 
    return url.Parse(resp.Url)
}

继而调用了runtimeService.Exec方法, 此处runtimeService根据CRI创建的remoteRuntimeService,简单来说就是对应CRI server的client端

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
    ctx, cancel := getContextWithTimeout(r.timeout)
    defer cancel()
 
    resp, err := r.runtimeClient.Exec(ctx, req)
    if err != nil {
        klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
        return nil, err
    }
 
    if resp.Url == "" {
        errorMessage := "URL is not set"
        klog.Errorf("Exec failed: %s", errorMessage)
        return nil, errors.New(errorMessage)
    }
 
    return resp, nil
}

调用cri client请求cri server端,在这里server端就是docker shim (docker Service对象)

// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
    if ds.streamingServer == nil {
        return nil, streaming.NewErrorStreamingDisabled("exec")
    }
    _, err := checkContainerStatus(ds.client, req.ContainerId)
    if err != nil {
        return nil, err
    }
    return ds.streamingServer.GetExec(req)
}

调用dockerService.StreamingSerer的GetExec方法,streamingServer的所有方法都定义在:pkg/kubelet/server/streaming/server.go里

func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
    if err := validateExecRequest(req); err != nil {
        return nil, err
    }
    token, err := s.cache.Insert(req)
    if err != nil {
        return nil, err
    }
    return &runtimeapi.ExecResponse{
        Url: s.buildURL("exec", token),
    }, nil
}

可以看到这里只是返回一个简单的token组合成的url, 之所以生成一个token是因为用户的命令中可能包含各种各样的字符,各种长度的字符,需要格式化为一个简单的token。 该token会缓存在本地,后面真正的exec请求会携带这该token,通过该token找到之前的具体请求。

处理streaming请求

在获取到该exec真正的URL后,就需要通过该URL来获取真正的数据了。为该URL提供服务的sever一般位于CRI的实现之中,例如docker shim会创建一个streamingServer来提供服务。

各个运行时 streaming server 的处理框架都是类似的,kublet为了方便各runtime实现CRI接口,提供了统一的包,位于:pkg/kubelet/server/streaming/server.go。 各种底层runtime只需要实现其中的steaming.Runtime接口就可以简单创建一个streamingServer:

// Runtime is the interface to execute the commands and provide the streams.
type Runtime interface {
    Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
    Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
    PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error
}

目前kubelet内置了docker runtime的实现:dockershim,在dockershim中streaming.Runtime的实现结构体为streamingRuntime:

type streamingRuntime struct {
    client      libdocker.Interface
    execHandler ExecHandler
}
 
var _ streaming.Runtime = &streamingRuntime{}

其中docker client作为成员函数,以便后面请求docker获取数据。


NOTE

注意区分kubelet中各种runtime的定义,说实话各种runtime确实挺乱的,我们需要明确各种Runtime是定义在哪个scope下的,streaming.Runtime是个interface, 位于pkg/kubelet/server/streaming/server.go,用来定义流处理请求的所需要的动作。streamingRuntime是dockershim对streaming.Runtime interface的具体实现,位于pkg/kubelet/dockershim/docker_streaming.go,该结构体为private的。 另一个比较容易混淆的是pkg/kubelet/container/runtime.go (俗称kubecontainer)中的StreamingRuntime, 该interface为public的,用来定义GetExec/GetAttach/GetPortForward接口


利用该streaming.Runtime就可以创建streamingServer了:

 
// NewServer creates a new Server for stream requests.
// TODO(tallclair): Add auth(n/z) interface & handling.
func NewServer(config Config, runtime Runtime) (Server, error) {
    s := &server{
        config:  config,
        runtime: &criAdapter{runtime},
        cache:   newRequestCache(),
    }
 
    if s.config.BaseURL == nil {
        s.config.BaseURL = &url.URL{
            Scheme: "http",
            Host:   s.config.Addr,
        }
        if s.config.TLSConfig != nil {
            s.config.BaseURL.Scheme = "https"
        }
    }
 
    ws := &restful.WebService{}
    endpoints := []struct {
        path    string
        handler restful.RouteFunction
    }{
        {"/exec/{token}", s.serveExec},
        {"/attach/{token}", s.serveAttach},
        {"/portforward/{token}", s.servePortForward},
    }
    // If serving relative to a base path, set that here.
    pathPrefix := path.Dir(s.config.BaseURL.Path)
    for _, e := range endpoints {
        for _, method := range []string{"GET", "POST"} {
            ws.Route(ws.
                Method(method).
                Path(path.Join(pathPrefix, e.path)).
                To(e.handler))
        }
    }
    handler := restful.NewContainer()
    handler.Add(ws)
    s.handler = handler
    s.server = &http.Server{
        Addr:      s.config.Addr,
        Handler:   s.handler,
        TLSConfig: s.config.TLSConfig,
    }
 
    return s, nil
}

NewServer中会注册对应的handler来处理/exec/{token}类接口。这里exec的handdler为ServerExec方法:

func (s *server) serveExec(req *restful.Request, resp *restful.Response) {
    token := req.PathParameter("token")
    cachedRequest, ok := s.cache.Consume(token)
    if !ok {
        http.NotFound(resp.ResponseWriter, req.Request)
        return
    }
    exec, ok := cachedRequest.(*runtimeapi.ExecRequest)
    if !ok {
        http.NotFound(resp.ResponseWriter, req.Request)
        return
    }
 
    streamOpts := &remotecommandserver.Options{
        Stdin:  exec.Stdin,
        Stdout: exec.Stdout,
        Stderr: exec.Stderr,
        TTY:    exec.Tty,
    }
 
    remotecommandserver.ServeExec(
        resp.ResponseWriter,
        req.Request,
        s.runtime,
        "", // unused: podName
        "", // unusued: podUID
        exec.ContainerId,
        exec.Cmd,
        streamOpts,
        s.config.StreamIdleTimeout,
        s.config.StreamCreationTimeout,
        s.config.SupportedRemoteCommandProtocols)
}

进而调用ServeExec方法:

// ServeExec handles requests to execute a command in a container. After
// creating/receiving the required streams, it delegates the actual execution
// to the executor.
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
    ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout)
    if !ok {
        // error is handled by createStreams
        return
    }
    defer ctx.conn.Close()
 
    err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
    if err != nil {
        if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
            rc := exitErr.ExitStatus()
            ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
                Status: metav1.StatusFailure,
                Reason: remotecommandconsts.NonZeroExitCodeReason,
                Details: &metav1.StatusDetails{
                    Causes: []metav1.StatusCause{
                        {
                            Type:    remotecommandconsts.ExitCodeCauseType,
                            Message: fmt.Sprintf("%d", rc),
                        },
                    },
                },
                Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
            }})
        } else {
            err = fmt.Errorf("error executing command in container: %v", err)
            runtime.HandleError(err)
            ctx.writeStatus(apierrors.NewInternalError(err))
        }
    } else {
        ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
            Status: metav1.StatusSuccess,
        }})
    }
}
 

在remotecommandserver.ServeExec中调用了executer.ExecInContainer 方法, 该executer接口的实现是criAdapter, criAdapter只是Runtime的一个wrapper,真正调用的是Runtime.Exec, Runtime是个interface,我们来看下具体在dockershim中的实现:

func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
    return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
}
 
// Internal version of Exec adds a timeout.
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    container, err := checkContainerStatus(r.client, containerID)
    if err != nil {
        return err
    }
    return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout)
}

其中execHandler是在初始化streamRuntime的时候定义的NativeExecHandler, 可以看到是直接调用libdocker api与docker进行交互

// NativeExecHandler executes commands in Docker containers using Docker's exec API.
type NativeExecHandler struct{}
 
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
    done := make(chan struct{})
    defer close(done)
 
    createOpts := dockertypes.ExecConfig{
        Cmd:          cmd,
        AttachStdin:  stdin != nil,
        AttachStdout: stdout != nil,
        AttachStderr: stderr != nil,
        Tty:          tty,
    }
    execObj, err := client.CreateExec(container.ID, createOpts)
    if err != nil {
        return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err)
    }
 
    // Have to start this before the call to client.StartExec because client.StartExec is a blocking
    // call :-( Otherwise, resize events don't get processed and the terminal never resizes.
    //
    // We also have to delay attempting to send a terminal resize request to docker until after the
    // exec has started; otherwise, the initial resize request will fail.
    execStarted := make(chan struct{})
    go func() {
        select {
        case <-execStarted:
            // client.StartExec has started the exec, so we can start resizing
        case <-done:
            // ExecInContainer has returned, so short-circuit
            return
        }
 
        kubecontainer.HandleResizing(resize, func(size remotecommand.TerminalSize) {
            client.ResizeExecTTY(execObj.ID, uint(size.Height), uint(size.Width))
        })
    }()
 
    startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
    streamOpts := libdocker.StreamOptions{
        InputStream:  stdin,
        OutputStream: stdout,
        ErrorStream:  stderr,
        RawTerminal:  tty,
        ExecStarted:  execStarted,
    }
    err = client.StartExec(execObj.ID, startOpts, streamOpts)
    if err != nil {
        return err
    }
 
    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()
    count := 0
    for {
        inspect, err2 := client.InspectExec(execObj.ID)
        if err2 != nil {
            return err2
        }
        if !inspect.Running {
            if inspect.ExitCode != 0 {
                err = &dockerExitError{inspect}
            }
            break
        }
 
        count++
        if count == 5 {
            klog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID)
            break
        }
 
        <-ticker.C
    }
 
    return err
}
 

至此整个处理流程就结束了。

其实整个流程也比较简单,就是各个runtime shim实现一个kubelet定义好的iterface streaming.Runtime, 然后就可以利用kubelet提供的一个统一的工具package简单实现一个streaming server。该server负责两件事情,1. getExec: 首先根据用户请求的命令返回一个带有token的url,重定向用户请求到该url。2. serveExec: 随后真正提供exec的服务,该exec调用各个runtime shim的具体实现.。

关于 RedirectContainerStreaming

在前面提到,如果kubelet开启了RedirectContainerStreaming,则kubelet会将streaming URL返回给apiserver, 随后apiserver会重定向到该streaming URL。这样设计是为了避免所有的流式请求都经过kubelet对kubelet造成压力, 但是从另外一方面考虑,这样做的缺点是无法使用kubelet的认证功能。

那接下来我们仔细探究一下该参数所起到的真正作用,kubelet中默认runtime是docker,所以这里的研究对象就是dockershim:

一: 如果将RedirectContainerStreaming参数设置为true 则返回的URL类似于/cri/exec/aRbQe4pn,可以看到这里的域名默认是没有hostname的,则重定向时默认重定向到原来的hostname,即kubelet监听的hostname。 所以其实dockershim作为默认的runtime时,设置RedirectContainerStreaming为true并不会有什么本质的区别,对kubelet的性能影响并没有减少,因为所有的流处理还是经过了kubelet。

上述/cri/exec/aRbQe4pn这个路径在kubelet server中的对应handler为criHandler。 kubelet启动的时候会对该criHandler进行赋值,将dockerService赋值给criHandler (此处docker service其实就是docker shim)

if crOptions.RedirectContainerStreaming {
    klet.criHandler = ds
}

dockerService中的实现为:

func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if ds.streamingServer != nil {
        ds.streamingServer.ServeHTTP(w, r)
    } else {
        http.NotFound(w, r)
    }
}

简单调用了streamingSever的ServeHTTP方法:

func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    s.handler.ServeHTTP(w, r)
}

在streamingServer会根据注册的handler进行处理请, 这就又回到了我们上面提到的第二步: 提供streaming请求

整个workflow如下:

二: 如果设置RedirectContainerStreaming为false 则此时第一步获取到的streaming URL形如:http://127.0.0.1:36699/exec/8rYzmQK9。 可以看到这里是带有hostname的。 因为此时kubelet并不会将该URL返回给apiserver,会直接请求该URL进行代理,如此一来就可以通过127.0.0.1进行直接通信,这个localhost的端口必然是由docker shim来进行监听:

// Start initializes and starts components in dockerService.
func (ds *dockerService) Start() error {
    ds.initCleanup()
 
    // Initialize the legacy cleanup flag.
    if ds.startLocalStreamingServer {
        go func() {
            if err := ds.streamingServer.Start(true); err != nil {
                klog.Fatalf("Streaming server stopped unexpectedly: %v", err)
            }
        }()
    }
    return ds.containerManager.Start()
}

在dockerShim中调用了streamingServer的Start方法启动监听端口:

func (s *server) Start(stayUp bool) error {
    if !stayUp {
        // TODO(tallclair): Implement this.
        return errors.New("stayUp=false is not yet implemented")
    }
 
    listener, err := net.Listen("tcp", s.config.Addr)
    if err != nil {
        return err
    }
    // Use the actual address as baseURL host. This handles the "0" port case.
    s.config.BaseURL.Host = listener.Addr().String()
    if s.config.TLSConfig != nil {
        return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
    }
    return s.server.Serve(listener)
}

通过启动一个监听在localhost的sever, 这就又回到了我们上面提到的第二步: 提供streaming请求

整个workflow如下:

仔细观察这两幅workflow图片的差别就会发现, RedirectContainerStreaming 在默认的dockershim中并没有实质的作用,此时还是建议将该值设置为false来提供流处理请求的认证功能。

Reference

Kubernetes 容器运行时演进

How does 'kubectl exec' work?

CRI Streaming Requests

kubernetes PR#38742: [CRI] Don't include user data in CRI streaming redirect URLs

相关文章
|
1月前
|
Kubernetes Perl 容器
kubernetes 的Job 的并行执行 配置
在Kubernetes中,Job是一种用于批处理任务的Controller对象。如果你想要配置Job以支持并行执行,可以使用Job的`.spec.parallelism`字段。这个字段定义了Job中可以并行运行的Pod的最大数量。 下面是一个简单的Job定义,其中包含了`.spec.parallelism`字段: ```yaml apiVersion: batch/v1 kind: Job metadata: name: example-job spec: parallelism: 3 # 这里定义了并行运行的Pod的数量 completions: 5 # 定义了成功完成的
|
8月前
|
Kubernetes 容器
使用Kubeadm部署K8s集群获取kube-scheduler和kube-controller-manager组件状态异常问题
使用Kubeadm部署K8s集群获取kube-scheduler和kube-controller-manager组件状态异常问题
100 0
|
8月前
|
Kubernetes 监控 容器
kubectl 的作用
kubectl是Kubernetes命令行工具,用于与Kubernetes集群进行交互和管理。它提供了许多命令,用于管理集群、部署应用、监视资源状态、调试问题等。 以下是kubectl的主要功能: 1. 配置管理:kubectl可以用于查看、创建、修改和删除Kubernetes集群的配置信息,如节点、命名空间、服务账号、角色等。 2. 应用管理:kubectl可以用于管理应用程序的部署、扩容、缩放、更新和删除。通过kubectl,您可以创建和管理Pod、Service、Deployment、StatefulSet等资源对象。 3. 资源监控:kubectl可以获取和监视Kubernet
146 0
|
8月前
|
存储 Kubernetes API
【K8s源码品读】007:Phase 1 - kube-apiserver - Pod数据的保存
理解Pod发送到kube-apiserver后是怎么保存的
44 0
|
8月前
|
设计模式 Kubernetes 网络架构
|
9月前
|
Kubernetes 网络协议 Perl
Kubernetes 优雅终止 pod
Kubernetes 优雅终止 pod
112 1
|
8月前
|
Kubernetes Docker 容器
Kubernetes集群部署中安装Pods网络插件一直显示Pending状态解决
Kubernetes集群部署中安装Pods网络插件一直显示Pending状态解决
215 0
|
11月前
|
Perl
18-Kubernetes-Pod控制器详解- Job
18-Kubernetes-Pod控制器详解- Job
|
消息中间件 Kubernetes API
Kubernetes: Job 和 CronJob 的实现原理
Kubernetes: Job 和 CronJob 的实现原理
1067 0
Kubernetes: Job 和 CronJob 的实现原理
|
Kubernetes 网络协议 API
Kubernetes Kubelet 状态更新机制
Kubernetes Kubelet 状态更新机制
Kubernetes Kubelet 状态更新机制