Docker基础之containerd的两个组件:container和process

一、container结构体函数

type container struct {
	// 存储运行时状态信息的路径
	root        string //containerd的工作目录
	id          string
	bundle      string
	runtime     string
	runtimeArgs []string
	shim        string
	processes   map[string]*process
	labels      []string
	oomFds      []int
	//比较chroot和pivot_root
	noPivotRoot bool
	timeout     time.Duration
}
// 生成一个container,并把state信息记录在state.json文件中
func New(opts ContainerOpts) (Container, error) {
	c := &container{
		root:        opts.Root,
		id:          opts.ID,
		bundle:      opts.Bundle,
		labels:      opts.Labels,
		processes:   make(map[string]*process),
		runtime:     opts.Runtime,
		runtimeArgs: opts.RuntimeArgs,
		shim:        opts.Shim,
		noPivotRoot: opts.NoPivotRoot,
		timeout:     opts.Timeout,
	}
	//创建state.json
	if err := os.Mkdir(filepath.Join(c.root, c.id), 0755); err != nil {
		return nil, err
	}
	f, err := os.Create(filepath.Join(c.root, c.id, StateFile))
	if err != nil {
		return nil, err
	}
	defer f.Close()
	if err := json.NewEncoder(f).Encode(state{
		Bundle:      c.bundle,
		Labels:      c.labels,
		Runtime:     c.runtime,
		RuntimeArgs: c.runtimeArgs,
		Shim:        c.shim,
		NoPivotRoot: opts.NoPivotRoot,
	}); err != nil {
		return nil, err
	}
	return c, nil
}

1、 container的Start()函数

func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
	//创建容器初始化进程工作的根目录
	processRoot := filepath.Join(c.root, c.id, InitProcessID)
	if err := os.Mkdir(processRoot, 0755); err != nil {
		return nil, err
	}
	//生成命令cmd:shim {containerID} {bundleDirPath} runc,工作目录为process目录processRoot;containerd-shim 会调用`runc create {containerID}`
	cmd := exec.Command(c.shim,
		c.id, c.bundle, c.runtime,
	)
	cmd.Dir = processRoot //指定cmd命令执行的工作目录,这里是init命令
	cmd.SysProcAttr = &syscall.SysProcAttr{
		Setpgid: true,
	}
	//读取bundle目录下的config.json文件
	spec, err := c.readSpec()
	if err != nil {
		return nil, err
	}
	config := &processConfig{
		checkpoint: checkpointPath,
		root:       processRoot,
		//InitProcessID = "init" 所有容器的初始化进程
		id:          InitProcessID,
		c:           c,
		stdio:       s,
		spec:        spec,
		processSpec: specs.ProcessSpec(spec.Process),
	}
	//根据构建的进程配置文件config,创建一个type process struct对象
	p, err := newProcess(config)
	if err != nil {
		return nil, err
	}
	if err := c.createCmd(InitProcessID, cmd, p); err != nil {
		return nil, err
	}
	return p, nil
}

2、func Exec()函数
docker exec cmd根据传入的参数生成一个p Process,此时,宿主机上会再执行一个contaninerd-shim,把cmd传递给contaninerd-shim。 如果再加上docker start时的init进程,宿主机上会同时存在两个contaninerd-shim进程。

func (c *container) Exec(pid string, pspec specs.ProcessSpec, s Stdio) (pp Process, err error) {
	//生成exec cmd的工作目录
	processRoot := filepath.Join(c.root, c.id, pid)
	if err := os.Mkdir(processRoot, 0755); err != nil {
		return nil, err
	}
	defer func() {
		if err != nil {
			c.RemoveProcess(pid)
		}
	}()
	//和初始化进程类似,通过containerd-shim执行,cmd.Dir = processRoot ,其值和初始化进程是不一样的,shim正是根据process.json中的相关属性来判断是Start()还是Exec()
	cmd := exec.Command(c.shim,
		c.id, c.bundle, c.runtime,
	)
	cmd.Dir = processRoot //指定cmd的工作目录
	cmd.SysProcAttr = &syscall.SysProcAttr{
		Setpgid: true,
	}
	spec, err := c.readSpec()
	if err != nil {
		return nil, err
	}
	config := &processConfig{
		exec:        true,
		id:          pid,
		root:        processRoot,
		c:           c,
		processSpec: pspec,
		spec:        spec,
		stdio:       s,
	}
	//config会写入process.json文件
	p, err := newProcess(config)
	if err != nil {
		return nil, err
	}
	if err := c.createCmd(pid, cmd, p); err != nil {
		return nil, err
	}
	return p, nil
}

3、createCmd()函数
createCmd()执行一个前面Start()和Exec()生成的containerd-shim命令。
当具体容器内进程pid生成(由runc生成)后,createCmd会启动一个go routine来等待shim命令的结束。
shim命令一般不会退出。
当shim发生退出时,如果容器内的进程仍在运行,则需要把该进程杀死;如果容器内进程已经不存在,则无需清理工作。

func (c *container) createCmd(pid string, cmd *exec.Cmd, p *process) error {
	p.cmd = cmd
	//执行cmd containerd-shim命令
	if err := cmd.Start(); err != nil {
		close(p.cmdDoneCh)
		if exErr, ok := err.(*exec.Error); ok {
			if exErr.Err == exec.ErrNotFound || exErr.Err == os.ErrNotExist {
				return fmt.Errorf("%s not installed on system", c.shim)
			}
		}
		return err
	}
	go func() {
		//Wait(),等待cmd执行完成
		err := p.cmd.Wait()
		if err == nil {
			p.cmdSuccess = true
		}
		close(p.cmdDoneCh)
	}()
	//waitForCreate,等待进程创建完成
	if err := c.waitForCreate(p, cmd); err != nil {
		return err
	}
	c.processes[pid] = p //记录成功创建的容器中进程pid
	return nil
}

4、Load()函数
Load()函数是读取container的state.json及各进程的process.json,还原container对象

// 读取container的state.json及各进程的process.json,还原container对象。
func Load(root, id, shimName string, timeout time.Duration) (Container, error) {
	var s state
	f, err := os.Open(filepath.Join(root, id, StateFile))
	if err != nil {
		return nil, err
	}
	defer f.Close()
	if err := json.NewDecoder(f).Decode(&s); err != nil {
		return nil, err
	}
	c := &container{
		root:        root,
		id:          id,
		bundle:      s.Bundle,
		labels:      s.Labels,
		runtime:     s.Runtime,
		runtimeArgs: s.RuntimeArgs,
		shim:        s.Shim,
		noPivotRoot: s.NoPivotRoot,
		processes:   make(map[string]*process),
		timeout:     timeout,
	}
	if c.shim == "" {
		c.shim = shimName
	}
	dirs, err := ioutil.ReadDir(filepath.Join(root, id))
	if err != nil {
		return nil, err
	}
	// 一个目录代表一个进程
	for _, d := range dirs {
		//如果容器只有一个初始化进程,那么一个是init文件夹,一个是state.json文件
		if !d.IsDir() {
			continue
		}
		pid := d.Name()
		// 读取var/run/docker/libcontainerd/containerd/{containerID}/init/process.json
		s, err := readProcessState(filepath.Join(root, id, pid))
		if err != nil {
			return nil, err
		}
		p, err := loadProcess(filepath.Join(root, id, pid), pid, c, s)
		if err != nil {
			logrus.WithField("id", id).WithField("pid", pid).Debug("containerd: error loading process %s", err)
			continue
		}
		c.processes[pid] = p
	}
	return c, nil
}

5、readProcessState()函数
readProcessState()函数是读取var/run/docker/libcontainerd/containerd/{containerID}/init/process.json,获取容器中进程状态

//读取var/run/docker/libcontainerd/containerd/{containerID}/init/process.json
func readProcessState(dir string) (*ProcessState, error) {
	f, err := os.Open(filepath.Join(dir, "process.json"))
	if err != nil {
		return nil, err
	}
	defer f.Close()
	var s ProcessState
	if err := json.NewDecoder(f).Decode(&s); err != nil {
		return nil, err
	}
	return &s, nil
}

6、readSpec()函数
readSpec()读取bundle目录下的config.json文件。

//readSpec()读取bundle目录下的config.json文件。
func (c *container) readSpec() (*specs.Spec, error) {
	var spec specs.Spec
	f, err := os.Open(filepath.Join(c.bundle, "config.json"))
	if err != nil {
		return nil, err
	}
	defer f.Close()
	if err := json.NewDecoder(f).Decode(&spec); err != nil {
		return nil, err
	}
	return &spec, nil
}

7、Delete()函数
Delete()先移除containerd目录下的容器目录,然后调用runc delete id删除容器。

//Delete()先移除containerd目录下的容器目录,然后调用runc delete id删除容器。
func (c *container) Delete() error {
	err := os.RemoveAll(filepath.Join(c.root, c.id))
	//调用runc delete {containerID}
	args := c.runtimeArgs
	args = append(args, "delete", c.id)
	if b, derr := exec.Command(c.runtime, args...).CombinedOutput(); err != nil {
		err = fmt.Errorf("%s: %q", derr, string(b))
	}
	return err
}

8、RemoveProcess()函数

//删除指定process的目录。在containerd中,一个process用一个目录表示。
func (c *container) RemoveProcess(pid string) error {
	delete(c.processes, pid)
	return os.RemoveAll(filepath.Join(c.root, c.id, pid))
}

9、Pause()和Resume()

//调用`runc pause {containerID}`挂起一个容器
func (c *container) Pause() error {
	args := c.runtimeArgs
	args = append(args, "pause", c.id)
	b, err := exec.Command(c.runtime, args...).CombinedOutput()
	if err != nil {
		return fmt.Errorf("%s: %q", err.Error(), string(b))
	}
	return nil
}
//与Pause()相对应,Resume()恢复某一容器。
func (c *container) Resume() error {
	args := c.runtimeArgs
	args = append(args, "resume", c.id)
	b, err := exec.Command(c.runtime, args...).CombinedOutput()
	if err != nil {
		return fmt.Errorf("%s: %q", err.Error(), string(b))
	}
	return nil
}

10、Status()函数

//通过runc state {containerID}获取容器的状态信息
func (c *container) Status() (State, error) {
	args := c.runtimeArgs
	args = append(args, "state", c.id)
	out, err := exec.Command(c.runtime, args...).CombinedOutput()
	if err != nil {
		return "", fmt.Errorf("%s: %q", err.Error(), out)
	}
	//我们只需要运行时json输出有一个*的Status字段。
	var s struct {
		Status State `json:"status"`
	}
	if err := json.Unmarshal(out, &s); err != nil {
		return "", err
	}
	return s.Status, nil
}

11、waitForCreate()函数

//等待进程process创建结束
func (c *container) waitForCreate(p *process, cmd *exec.Cmd) error {
	wc := make(chan error, 1)
	//新建一个goroutine对象来循环读取pidfile(pid文件其实就记录了一行表示该进程的pid),如果都到该pid的存在,表示创建成功。
	go func() {
		for {
			if _, err := p.getPidFromFile(); err != nil {
				//读取失败后的处理
				if os.IsNotExist(err) || err == errInvalidPidInt {
					alive, err := isAlive(cmd)
					if err != nil {
						wc <- err
						return
					}
					if !alive {
						// Runc可能无法运行容器,所以让我们从日志中获取错误,否则垫片可能会遇到错误
						messages, err := readLogMessages(filepath.Join(p.root, "shim-log.json"))
						if err != nil {
							wc <- err
							return
						}
						for _, m := range messages {
							if m.Level == "error" {
								wc <- fmt.Errorf("shim error: %v", m.Msg)
								return
							}
						}
						// 没有错误报告从shim返回,检查runc/运行时错误
						messages, err = readLogMessages(filepath.Join(p.root, "log.json"))
						if err != nil {
							if os.IsNotExist(err) {
								err = ErrContainerNotStarted
							}
							wc <- err
							return
						}
						for _, m := range messages {
							if m.Level == "error" {
								wc <- fmt.Errorf("oci runtime error: %v", m.Msg)
								return
							}
						}
						wc <- ErrContainerNotStarted
						return
					}
					time.Sleep(15 * time.Millisecond)
					continue
				}
				wc <- err
				return
			}
			// 日志含义读取pid文件成功
			wc <- nil
			return
		}
	}()
	select {
	//channel wc的容量为1
	case err := <-wc:
		if err != nil {
			return err
		}
		return nil
	case <-time.After(c.timeout):
		cmd.Process.Kill()
		cmd.Wait()
		return ErrContainerStartTimeout
	}
}

二、process 构造函数

表示容器内部运行的一个进程

type process struct {
	root        string 
	id          string //标志符
	pid         int //进程标志符
	exitPipe    *os.File //退出文件管道
	controlPipe *os.File //控制文件管道
	container   *container //容器
	spec        specs.ProcessSpec
	stdio       Stdio //标准io
	cmd         *exec.Cmd //控制台
	cmdSuccess  bool //控制指令执行成功标志符
	cmdDoneCh   chan struct{}
	state       State //状态
	stateLock   sync.Mutex
}

1、 newProcess()函数

//列出一个新进程的相关参数
func newProcess(config *processConfig) (*process, error) {
	p := &process{
		root:      config.root,
		id:        config.id,
		container: config.c,
		spec:      config.processSpec,
		stdio:     config.stdio,
		cmdDoneCh: make(chan struct{}),
		state:     Running,
	}
	uid, gid, err := getRootIDs(config.spec)
	if err != nil {
		return nil, err
	}
	//创建process.json文件
	f, err := os.Create(filepath.Join(config.root, "process.json"))
	if err != nil {
		return nil, err
	}
	defer f.Close()
	//构建进程状态文件
	ps := ProcessState{
		ProcessSpec: config.processSpec,
		Exec:        config.exec,
		PlatformProcessState: PlatformProcessState{
			Checkpoint: config.checkpoint,
			RootUID:    uid,
			RootGID:    gid,
		},
		Stdin:       config.stdio.Stdin,
		Stdout:      config.stdio.Stdout,
		Stderr:      config.stdio.Stderr,
		RuntimeArgs: config.c.runtimeArgs,
		NoPivotRoot: config.c.noPivotRoot,
	}
	//写入process.json
	if err := json.NewEncoder(f).Encode(ps); err != nil {
		return nil, err
	}
	//根据ExitFile文件路径创建Mkfifo管道对象,并以O_NONBLOCK,O_RDONLY模式打开管道OpenFile等待写入的数据并读取。
	exit, err := getExitPipe(filepath.Join(config.root, ExitFile))
	if err != nil {
		return nil, err
	}
	//根据ControlFile文件路径创建Mkfifo管道对象,以O_NONBLOCK,O_RDWD模式打开管道OpenFile等待写入的数据并读取
	control, err := getControlPipe(filepath.Join(config.root, ControlFile))
	if err != nil {
		return nil, err
	}
	//把fifo管道exit赋给p.exitPipe,后面由monitor的epoll机制来读取该管道
	p.exitPipe = exit
	p.controlPipe = control
	return p, nil
}

2、loadProcess()函数

//从process.json中还原process的信息
func loadProcess(root, id string, c *container, s *ProcessState) (*process, error) {
	p := &process{
		root:      root,
		id:        id,
		container: c,
		spec:      s.ProcessSpec,
		stdio: Stdio{
			Stdin:  s.Stdin,
			Stdout: s.Stdout,
			Stderr: s.Stderr,
		},
		state: Stopped,
	}
	if _, err := p.getPidFromFile(); err != nil {
		return nil, err
	}
	if _, err := p.ExitStatus(); err != nil {
		if err == ErrProcessNotExited {
			exit, err := getExitPipe(filepath.Join(root, ExitFile))
			if err != nil {
				return nil, err
			}
			p.exitPipe = exit

			control, err := getControlPipe(filepath.Join(root, ControlFile))
			if err != nil {
				return nil, err
			}
			p.controlPipe = control

			p.state = Running
			return p, nil
		}
		return nil, err
	}
	return p, nil
}

3、Signal()函数

// S信号量用于控制进程执行顺序
func (p *process) Signal(s os.Signal) error {
	return syscall.Kill(p.pid, s.(syscall.Signal))
}

4、 process的Start()函数

// Start解除与容器相关的初始化进程的阻塞。
func (p *process) Start() error {
	if p.ID() == InitProcessID {
		var (
			errC = make(chan error, 1)
			args = append(p.container.runtimeArgs, "start", p.container.id)
			cmd  = exec.Command(p.container.runtime, args...)
		)
		go func() {
			//若果runc start执行成功,向errC发送nil
			out, err := cmd.CombinedOutput()
			if err != nil {
				errC <- fmt.Errorf("%s: %q", err.Error(), out)
			}
			errC <- nil
		}()
		select {
		case err := <-errC: //出现error
			if err != nil {
				return err
			}
		case <-p.cmdDoneCh: //cmdDoneCh被关闭之后,判断如果不是成功执行,强制kill掉
			if !p.cmdSuccess {//指令执行不成功的操作
				cmd.Process.Kill()
				cmd.Wait()
				return ErrShimExited
			}
			err := <-errC
			if err != nil {
				return err
			}
		}
	}
	return nil
}
上一篇:jquery的DOM节点操作(复制元素节点)


下一篇:python django 基本测试 及调试 201812