获取streaming URL
kubelet在启动时会初始化一个serve,注册好对应的handler, exec的handler如下:
ws = new(restful.WebService) ws. Path("/exec") ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}"). To(s.getExec). Operation("getExec")) ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}"). To(s.getExec). Operation("getExec")) s.restfulCont.Add(ws)
所有的路径最后都由getExec
来进行处理:
// getExec handles requests to run a command inside a container. func (s *Server) getExec(request *restful.Request, response *restful.Response) { params := getExecRequestParams(request) streamOpts, err := remotecommandserver.NewOptions(request.Request) if err != nil { utilruntime.HandleError(err) response.WriteError(http.StatusBadRequest, err) return } pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) return } podFullName := kubecontainer.GetPodFullName(pod) url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) if err != nil { streaming.WriteError(err, response.ResponseWriter) return } if s.redirectContainerStreaming { http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound) return } proxyStream(response.ResponseWriter, request.Request, url) }
以上代码可以看出,首先调用host.GetExec获取URL,然后判断是否开启重定向,如果开启则进行重定向,否则直接代理请求到该streaming URL。这里host
对象对应的实现其实就是kubelet, 我们看下GetExec
的实现:
// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it. func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) { container, err := kl.findContainer(podFullName, podUID, containerName) if err != nil { return nil, err } if container == nil { return nil, fmt.Errorf("container not found (%q)", containerName) } return kl.streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY) }
这里只是继续调用streamingRuntime的GetExec
方法,streamingRuntime是个interface,具体的实现是kubeGenericRuntimeManager
// GetExec gets the endpoint the runtime will serve the exec request from. func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) { req := &runtimeapi.ExecRequest{ ContainerId: id.ID, Cmd: cmd, Tty: tty, Stdin: stdin, Stdout: stdout, Stderr: stderr, } resp, err := m.runtimeService.Exec(req) if err != nil { return nil, err } return url.Parse(resp.Url) }
继而调用了runtimeService.Exec方法, 此处runtimeService根据CRI创建的remoteRuntimeService,简单来说就是对应CRI server的client端
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() resp, err := r.runtimeClient.Exec(ctx, req) if err != nil { klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err) return nil, err } if resp.Url == "" { errorMessage := "URL is not set" klog.Errorf("Exec failed: %s", errorMessage) return nil, errors.New(errorMessage) } return resp, nil }
调用cri client请求cri server端,在这里server端就是docker shim (docker Service对象)
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { if ds.streamingServer == nil { return nil, streaming.NewErrorStreamingDisabled("exec") } _, err := checkContainerStatus(ds.client, req.ContainerId) if err != nil { return nil, err } return ds.streamingServer.GetExec(req) }
调用dockerService.StreamingSerer的GetExec方法,streamingServer的所有方法都定义在:pkg/kubelet/server/streaming/server.go里
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { if err := validateExecRequest(req); err != nil { return nil, err } token, err := s.cache.Insert(req) if err != nil { return nil, err } return &runtimeapi.ExecResponse{ Url: s.buildURL("exec", token), }, nil }
可以看到这里只是返回一个简单的token组合成的url, 之所以生成一个token是因为用户的命令中可能包含各种各样的字符,各种长度的字符,需要格式化为一个简单的token。 该token会缓存在本地,后面真正的exec请求会携带这该token,通过该token找到之前的具体请求。
处理streaming请求
在获取到该exec真正的URL后,就需要通过该URL来获取真正的数据了。为该URL提供服务的sever一般位于CRI的实现之中,例如docker shim会创建一个streamingServer来提供服务。
各个运行时 streaming server 的处理框架都是类似的,kublet为了方便各runtime实现CRI接口,提供了统一的包,位于:pkg/kubelet/server/streaming/server.go
。 各种底层runtime只需要实现其中的steaming.Runtime接口就可以简单创建一个streamingServer:
// Runtime is the interface to execute the commands and provide the streams. type Runtime interface { Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error }
目前kubelet内置了docker runtime的实现:dockershim,在dockershim中streaming.Runtime的实现结构体为streamingRuntime:
type streamingRuntime struct { client libdocker.Interface execHandler ExecHandler } var _ streaming.Runtime = &streamingRuntime{}
其中docker client作为成员函数,以便后面请求docker获取数据。
NOTE
注意区分kubelet中各种runtime的定义,说实话各种runtime确实挺乱的,我们需要明确各种Runtime是定义在哪个scope下的,streaming.Runtime
是个interface, 位于pkg/kubelet/server/streaming/server.go,用来定义流处理请求的所需要的动作。streamingRuntime
是dockershim对streaming.Runtime
interface的具体实现,位于pkg/kubelet/dockershim/docker_streaming.go
,该结构体为private的。 另一个比较容易混淆的是pkg/kubelet/container/runtime.go
(俗称kubecontainer)中的StreamingRuntime
, 该interface为public的,用来定义GetExec/GetAttach/GetPortForward
接口
利用该streaming.Runtime
就可以创建streamingServer了:
// NewServer creates a new Server for stream requests. // TODO(tallclair): Add auth(n/z) interface & handling. func NewServer(config Config, runtime Runtime) (Server, error) { s := &server{ config: config, runtime: &criAdapter{runtime}, cache: newRequestCache(), } if s.config.BaseURL == nil { s.config.BaseURL = &url.URL{ Scheme: "http", Host: s.config.Addr, } if s.config.TLSConfig != nil { s.config.BaseURL.Scheme = "https" } } ws := &restful.WebService{} endpoints := []struct { path string handler restful.RouteFunction }{ {"/exec/{token}", s.serveExec}, {"/attach/{token}", s.serveAttach}, {"/portforward/{token}", s.servePortForward}, } // If serving relative to a base path, set that here. pathPrefix := path.Dir(s.config.BaseURL.Path) for _, e := range endpoints { for _, method := range []string{"GET", "POST"} { ws.Route(ws. Method(method). Path(path.Join(pathPrefix, e.path)). To(e.handler)) } } handler := restful.NewContainer() handler.Add(ws) s.handler = handler s.server = &http.Server{ Addr: s.config.Addr, Handler: s.handler, TLSConfig: s.config.TLSConfig, } return s, nil }
在NewServer
中会注册对应的handler来处理/exec/{token}
类接口。这里exec的handdler为ServerExec
方法:
func (s *server) serveExec(req *restful.Request, resp *restful.Response) { token := req.PathParameter("token") cachedRequest, ok := s.cache.Consume(token) if !ok { http.NotFound(resp.ResponseWriter, req.Request) return } exec, ok := cachedRequest.(*runtimeapi.ExecRequest) if !ok { http.NotFound(resp.ResponseWriter, req.Request) return } streamOpts := &remotecommandserver.Options{ Stdin: exec.Stdin, Stdout: exec.Stdout, Stderr: exec.Stderr, TTY: exec.Tty, } remotecommandserver.ServeExec( resp.ResponseWriter, req.Request, s.runtime, "", // unused: podName "", // unusued: podUID exec.ContainerId, exec.Cmd, streamOpts, s.config.StreamIdleTimeout, s.config.StreamCreationTimeout, s.config.SupportedRemoteCommandProtocols) }
进而调用ServeExec
方法:
// ServeExec handles requests to execute a command in a container. After // creating/receiving the required streams, it delegates the actual execution // to the executor. func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) if !ok { // error is handled by createStreams return } defer ctx.conn.Close() err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) if err != nil { if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() { rc := exitErr.ExitStatus() ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusFailure, Reason: remotecommandconsts.NonZeroExitCodeReason, Details: &metav1.StatusDetails{ Causes: []metav1.StatusCause{ { Type: remotecommandconsts.ExitCodeCauseType, Message: fmt.Sprintf("%d", rc), }, }, }, Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr), }}) } else { err = fmt.Errorf("error executing command in container: %v", err) runtime.HandleError(err) ctx.writeStatus(apierrors.NewInternalError(err)) } } else { ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusSuccess, }}) } }
在remotecommandserver.ServeExec中调用了executer.ExecInContainer 方法, 该executer接口的实现是criAdapter, criAdapter只是Runtime的一个wrapper,真正调用的是Runtime.Exec
, Runtime是个interface,我们来看下具体在dockershim中的实现:
func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { return r.exec(containerID, cmd, in, out, err, tty, resize, 0) } // Internal version of Exec adds a timeout. func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { container, err := checkContainerStatus(r.client, containerID) if err != nil { return err } return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout) }
其中execHandler是在初始化streamRuntime的时候定义的NativeExecHandler, 可以看到是直接调用libdocker api与docker进行交互
// NativeExecHandler executes commands in Docker containers using Docker's exec API. type NativeExecHandler struct{} func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { done := make(chan struct{}) defer close(done) createOpts := dockertypes.ExecConfig{ Cmd: cmd, AttachStdin: stdin != nil, AttachStdout: stdout != nil, AttachStderr: stderr != nil, Tty: tty, } execObj, err := client.CreateExec(container.ID, createOpts) if err != nil { return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err) } // Have to start this before the call to client.StartExec because client.StartExec is a blocking // call :-( Otherwise, resize events don't get processed and the terminal never resizes. // // We also have to delay attempting to send a terminal resize request to docker until after the // exec has started; otherwise, the initial resize request will fail. execStarted := make(chan struct{}) go func() { select { case <-execStarted: // client.StartExec has started the exec, so we can start resizing case <-done: // ExecInContainer has returned, so short-circuit return } kubecontainer.HandleResizing(resize, func(size remotecommand.TerminalSize) { client.ResizeExecTTY(execObj.ID, uint(size.Height), uint(size.Width)) }) }() startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} streamOpts := libdocker.StreamOptions{ InputStream: stdin, OutputStream: stdout, ErrorStream: stderr, RawTerminal: tty, ExecStarted: execStarted, } err = client.StartExec(execObj.ID, startOpts, streamOpts) if err != nil { return err } ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { inspect, err2 := client.InspectExec(execObj.ID) if err2 != nil { return err2 } if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } count++ if count == 5 { klog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID) break } <-ticker.C } return err }
至此整个处理流程就结束了。
其实整个流程也比较简单,就是各个runtime shim实现一个kubelet定义好的iterface streaming.Runtime
, 然后就可以利用kubelet提供的一个统一的工具package简单实现一个streaming server。该server负责两件事情,1. getExec: 首先根据用户请求的命令返回一个带有token的url,重定向用户请求到该url。2. serveExec: 随后真正提供exec的服务,该exec调用各个runtime shim的具体实现.。
关于 RedirectContainerStreaming
在前面提到,如果kubelet开启了RedirectContainerStreaming
,则kubelet会将streaming URL返回给apiserver, 随后apiserver会重定向到该streaming URL。这样设计是为了避免所有的流式请求都经过kubelet对kubelet造成压力, 但是从另外一方面考虑,这样做的缺点是无法使用kubelet的认证功能。
那接下来我们仔细探究一下该参数所起到的真正作用,kubelet中默认runtime是docker,所以这里的研究对象就是dockershim:
一: 如果将RedirectContainerStreaming参数设置为true 则返回的URL类似于/cri/exec/aRbQe4pn
,可以看到这里的域名默认是没有hostname的,则重定向时默认重定向到原来的hostname,即kubelet监听的hostname。 所以其实dockershim作为默认的runtime时,设置RedirectContainerStreaming
为true并不会有什么本质的区别,对kubelet的性能影响并没有减少,因为所有的流处理还是经过了kubelet。
上述/cri/exec/aRbQe4pn
这个路径在kubelet server中的对应handler为criHandler
。 kubelet启动的时候会对该criHandler进行赋值,将dockerService赋值给criHandler (此处docker service其实就是docker shim)
if crOptions.RedirectContainerStreaming { klet.criHandler = ds }
dockerService中的实现为:
func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) { if ds.streamingServer != nil { ds.streamingServer.ServeHTTP(w, r) } else { http.NotFound(w, r) } }
简单调用了streamingSever的ServeHTTP方法:
func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.handler.ServeHTTP(w, r) }
在streamingServer会根据注册的handler进行处理请, 这就又回到了我们上面提到的第二步: 提供streaming请求
整个workflow如下:
二: 如果设置RedirectContainerStreaming
为false 则此时第一步获取到的streaming URL形如:http://127.0.0.1:36699/exec/8rYzmQK9
。 可以看到这里是带有hostname的。 因为此时kubelet并不会将该URL返回给apiserver,会直接请求该URL进行代理,如此一来就可以通过127.0.0.1进行直接通信,这个localhost的端口必然是由docker shim来进行监听:
// Start initializes and starts components in dockerService. func (ds *dockerService) Start() error { ds.initCleanup() // Initialize the legacy cleanup flag. if ds.startLocalStreamingServer { go func() { if err := ds.streamingServer.Start(true); err != nil { klog.Fatalf("Streaming server stopped unexpectedly: %v", err) } }() } return ds.containerManager.Start() }
在dockerShim中调用了streamingServer的Start方法启动监听端口:
func (s *server) Start(stayUp bool) error { if !stayUp { // TODO(tallclair): Implement this. return errors.New("stayUp=false is not yet implemented") } listener, err := net.Listen("tcp", s.config.Addr) if err != nil { return err } // Use the actual address as baseURL host. This handles the "0" port case. s.config.BaseURL.Host = listener.Addr().String() if s.config.TLSConfig != nil { return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig. } return s.server.Serve(listener) }
通过启动一个监听在localhost的sever, 这就又回到了我们上面提到的第二步: 提供streaming请求
整个workflow如下:
仔细观察这两幅workflow图片的差别就会发现, RedirectContainerStreaming
在默认的dockershim中并没有实质的作用,此时还是建议将该值设置为false来提供流处理请求的认证功能。
Reference
kubernetes PR#38742: [CRI] Don't include user data in CRI streaming redirect URLs