docker1.12-containerd源码分析

简介:

     从原openstack转型至docker已有一段时间。更稳定的使用docker了解docker的各流程,从源代码层面了解下containerd。本文基于docker 1.12版本,从1.11开始docker已拆分docker daemon

containerd源码流程图

wKioL1iB3_XRqLZuAAJ80zXVg4o002.png

源码接口调用详情

A)第一步从ctr入口至API接口

checkpoint(用于快照,docker目前该功能不完善)

1
2
3
list -->  /types .API /ListCheckpoint 
create -->  /types .API /CreateCheckpoint 
delete -->  /types .API /DeleteCheckpoint


containers

1
2
3
4
5
6
7
8
9
list、state --> /types .API /State
pause、resume、update --> /types .API /UpdateContainer
create -->  /types .API /CreateContainer
stats -->  /types .API /Stats
watch  --> /types .API /State  /types .API /Events
exec  --> /types .API /Events  /types .API /AddProcess  /types .API /UpdateProcess
kill  --> /types .API /Signal
start --> /types .API /Events  、  /types .API /CreateContainer  /types .API /UpdateProcess
update --> /types .API /UpdateContainer

events

1
/types .API /Events

state

1
/types .API /State

version

1
/types .API /GetServerVersion  -- return  result

B)第二步从API接口至supervisor任务处理

1
注:API--server.go --> daemon – supervisor.go(handleTask func)

checkpoint

1
2
3
/types .API /ListCheckpoint  (supervisor.GetContainersTask)--> getContainers 
/types .API /CreateCheckpoint  --> createCheckpoint
/types .API /DeleteCheckpoint  --> deleteCheckpoint

containers

1
2
3
4
5
6
7
/types .API /State  /types .API /Stats  (supervisor.GetContainersTask)--> getContainers 
/types .API /UpdateContainer  (supervisor.UpdateTask)-->updateContainer
/types .API /CreateContainer  (supervisor.StartTask)-->start
/types .API /Events  --> Events -- return  result
/types .API /AddProcess  -->addProcess
/types .API /UpdateProcess  -->updateProcess
/types .API /Signal  -->signal

 

C)第三步从任务队列至runtime至runc

checkpoint

1
2
3
getContainers --  return  result
createCheckpoint -->(runtime)CheckPoint --> exec .Command(c.runtime,arg....)
deleteCheckpoint -->(runtime)DeleteCheckpoint --  return  result

containers

1
2
3
4
5
6
getContainers --  return  result
updateContainer -->(runtime)Resume Pause UpdateResources--> exec .Command(c.runtime,arg....)
start -->(runtime supervisor /worker .go) Start --> exec .Command(c.shim,c. id ,c.bundle,c.runtime)
addProcess -->(runtime)  exec  -->  exec .Command(c.shim,c. id ,c.bundle,c.runtime)
updateProcess --> return  result
signal --> return  result


createContainer示例

deamon启动监听tasks及startTasks进程

a)进入main.go main方法调用daemon方法 

1
2
3
4
5
6
7
app.Action = func(context *cli.Context) {
        if  err := daemon(context); err != nil {
               logrus.Fatal(err)
        }
  
  
}

b)进入main.go daemon方法

1
2
3
4
5
6
7
8
for  i := 0; i < 10; i++ {
        wg.Add(1)
        w := supervisor.NewWorker(sv, wg)
        go w.Start()
}
if  err := sv.Start(); err != nil {
        return  err
}

c)初始化supervisor/worker.go NewWorker并启动监听startTask并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func NewWorker(s *Supervisor, wg * sync .WaitGroup) Worker {
        return  &worker{
               s:  s,
               wg: wg,
        }
}
  
  
  
func (w *worker) Start() {
        defer w.wg.Done()
        for  t := range w.s.startTasks {
               started :=  time .Now()
               process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
               if  err != nil {
                      logrus.WithFields(logrus.Fields{
                             "error" : err,
                             "id" :    t.Container.ID(),
                      }).Error( "containerd: start container" )
                      t.Err <- err
                      evt := &DeleteTask{
                             ID:      t.Container.ID(),
                             NoEvent:  true ,
                             Process: process,
                      }
                      w.s.SendTask(evt)
                      continue
               }

d)启动supervisor/supervisor.go task监听task并处理

1
2
3
4
5
6
7
8
9
10
11
12
13
func (s *Supervisor) Start() error {
        logrus.WithFields(logrus.Fields{
               "stateDir" :    s.stateDir,
               "runtime" :     s.runtime,
               "runtimeArgs" : s.runtimeArgs,
               "memory" :      s.machine.Memory,
               "cpus" :        s.machine.Cpus,
        }).Debug( "containerd: supervisor running" )
        go func() {
               for  i := range s.tasks {
                      s.handleTask(i)
  
               }

containers容器创建示例

Ctl控制台命令入口

ctr/main.go  containersCommand 

1
2
3
4
5
6
7
8
9
10
11
execCommand,
killCommand,
listCommand,
pauseCommand,
resumeCommand,
startCommand,
stateCommand,
statsCommand,
watchCommand,
  
updateCommand,

ctr/container.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
var startCommand = cli.Command{
        Name:       "start" ,
        Usage:      "start a container" ,
        ArgsUsage: "ID BundlePath”, ————…...
  
  
  
         events, err := c.Events(netcontext.Background(), &types.EventsRequest{})/*事件创建*/
        if  err != nil {
               fatal(err.Error(), 1)
        }
        if  _, err := c.CreateContainer(netcontext.Background(), r); err != nil {/*容器创建*/
               fatal(err.Error(), 1)
        }
        if  context.Bool( "attach" ) {
               go func() {
                      io.Copy(stdin, os.Stdin)
                      if  _, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{/*更新进程*/
                             Id:          id ,
                             Pid:         "init" ,
                             CloseStdin:  true ,
                      }); err != nil {
                             fatal(err.Error(), 1)
                      }
                      restoreAndCloseStdin()
               }()
               if  tty  {
                      resize( id "init" , c)
                      go func() {
                             s :=  make (chan os.Signal, 64)
                             signal.Notify(s, syscall.SIGWINCH)
                             for  range s {
                                    if  err := resize( id "init" , c); err != nil {
                                           log.Println(err)
                                    }
                             }
                      }()
               }
               waitForExit(c, events,  id "init" , restoreAndCloseStdin)
        }
  
},

api处理

api/grpc/types/api.pb.go 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (c *aPIClient) Events(ctx context.Context,  in  *EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) {
        stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams[0], c.cc,  "/types.API/Events" , opts...)
        if  err != nil {
               return  nil, err
        }
        x := &aPIEventsClient{stream}
        if  err := x.ClientStream.SendMsg( in ); err != nil {
               return  nil, err
        }
        if  err := x.ClientStream.CloseSend(); err != nil {
               return  nil, err
        }
        return  x, nil
}
  
  
  
func (c *aPIClient) CreateContainer(ctx context.Context,  in  *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
        out := new(CreateContainerResponse)
        err := grpc.Invoke(ctx,  "/types.API/CreateContainer" in , out, c.cc, opts...)
        if  err != nil {
               return  nil, err
        }
        return  out, nil
}
  
  
  
func (c *aPIClient) UpdateProcess(ctx context.Context,  in  *UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) {
        out := new(UpdateProcessResponse)
        err := grpc.Invoke(ctx,  "/types.API/UpdateProcess" in , out, c.cc, opts...)
        if  err != nil {
               return  nil, err
        }
        return  out, nil
}

api/grpc/types/api.pb.go 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
        m := new(EventsRequest)
        if  err := stream.RecvMsg(m); err != nil {
               return  err
        }
        return  srv.(APIServer).Events(m, &aPIEventsServer{stream})
}
  
  
func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
        in  := new(CreateContainerRequest)
        if  err := dec( in ); err != nil {
               return  nil, err
        }
        if  interceptor == nil {
               return  srv.(APIServer).CreateContainer(ctx,  in )
        }
        info := &grpc.UnaryServerInfo{
               Server:     srv,
               FullMethod:  "/types.API/CreateContainer" ,
        }
        handler := func(ctx context.Context, req interface{}) (interface{}, error) {
               return  srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))
        }
        return  interceptor(ctx,  in , info, handler)
}
  
  
  
func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
        in  := new(UpdateProcessRequest)
        if  err := dec( in ); err != nil {
               return  nil, err
        }
        if  interceptor == nil {
               return  srv.(APIServer).UpdateProcess(ctx,  in )
        }
        info := &grpc.UnaryServerInfo{
               Serv