码迷,mamicode.com
首页 > 其他好文 > 详细

docker exec的具体执行流程

时间:2015-10-29 22:02:26      阅读:841      评论:0      收藏:0      [点我收藏+]

标签:

首先,做一个docker exec的请求:

docker exec -it 5504f937f7bb sh

对应的docker -d(启动的docker daemon)的输出为

INFO[0211] POST /v1.20/containers/5504f937f7bb/exec     
INFO[0211] POST /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/start 
INFO[0211] POST /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/resize?h=69&w=236

//exit退出docker exec启动的tty后
2015/10/29 00:47:51 http: response.WriteHeader on hijacked connection
INFO[0294] GET /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/json


我自己创建了一个docker exec的main函数,模拟docker exec的客户端执行流程(入口执行)

package main

import (
        _"fmt"
        "github.com/docker/docker/api/client"
)

func main() {
        client.DoEexc()
}

然后是模拟docker exec请求的过程,自己重新写了一个,跳过flag模块对参数的解释,直接上静态的参数

package client

import (
	"encoding/json"
	"fmt"
	"io"

	"github.com/Sirupsen/logrus"
	"github.com/docker/docker/api/types"
	"github.com/docker/docker/cli"
	flag "github.com/docker/docker/pkg/mflag"
	"github.com/docker/docker/pkg/promise"
	"github.com/docker/docker/pkg/term"
	_ "github.com/docker/docker/runconfig"
)

type ExecConfig struct {
	User         string   // User that will run the command
	Privileged   bool     // Is the container in privileged mode
	Tty          bool     // Attach standard streams to a tty.
	Container    string   // Name of the container (to execute in)
	AttachStdin  bool     // Attach the standard input, makes possible user interaction
	AttachStderr bool     // Attach the standard output
	AttachStdout bool     // Attach the standard error
	Detach       bool     // Execute in detach mode
	Cmd          []string // Execution commands and args
}

var commonFlags = &cli.CommonFlags{FlagSet: new(flag.FlagSet)}

var clientFlags = &cli.ClientFlags{FlagSet: new(flag.FlagSet), Common: commonFlags}

func DoEexc() {
	stdin, stdout, stderr := term.StdStreams()
	newcli := NewDockerCli(stdin, stdout, stderr, clientFlags)

	if err := newcli.init(); err != nil {
		fmt.Printf("docker cli init err %s \n", err)
	}
	CmdExec(newcli)
}

func CmdExec(newcli *DockerCli) error {

	execConfig := &ExecConfig{
		User:         "",
		Privileged:   false,
		Tty:          true,
		Cmd:          []string{"sh"},
		Container:    "5504f937f7bb",
		Detach:       false,
		AttachStdin:  true,
		AttachStderr: true,
		AttachStdout: true,
	}

//对应daemon的第一个输出
serverResp, err := newcli.call("POST", "/containers/"+execConfig.Container+"/exec", execConfig, nil)
	
	if err != nil {
		return err
	}

	defer serverResp.body.Close()

	var response types.ContainerExecCreateResponse
	if err := json.NewDecoder(serverResp.body).Decode(&response); err != nil {
		return err
	}

	execID := response.ID
//Daemon会分配一个execID
	fmt.Printf("response id %d \n", execID)

	if execID == "" {
		fmt.Fprintf(newcli.out, "exec ID empty")
		return nil
	}

	//Temp struct for execStart so that we don‘t need to transfer all the execConfig
	execStartCheck := &types.ExecStartCheck{
		Detach: execConfig.Detach,
		Tty:    execConfig.Tty,
	}

	if !execConfig.Detach {
		if err := newcli.CheckTtyInput(execConfig.AttachStdin, execConfig.Tty); err != nil {
			return err
		}
	} else {
		if _, _, err := readBody(newcli.call("POST", "/exec/"+execID+"/start", execStartCheck, nil)); err != nil {
			return err
		}
		// For now don‘t print this - wait for when we support exec wait()
		// fmt.Fprintf(cli.out, "%s\n", execID)
		return nil
	}

	// 这个地方就是IO交互的处理流程,就是绑定tty,输出流重定向的过程
	var (
		out, stderr io.Writer
		in          io.ReadCloser
		hijacked    = make(chan io.Closer)
		errCh       chan error
	)

	// Block the return until the chan gets closed
	defer func() {
		logrus.Debugf("End of CmdExec(), Waiting for hijack to finish.")
		if _, ok := <-hijacked; ok {
			fmt.Fprintln(newcli.err, "Hijack did not finish (chan still open)")
		}
	}()

	if execConfig.AttachStdin {
		in = newcli.in
	}
	if execConfig.AttachStdout {
		out = newcli.out
	}
	if execConfig.AttachStderr {
		if execConfig.Tty {
			stderr = newcli.out
		} else {
			stderr = newcli.err
		}
	}
	//这个就是最关键的执行函数
	errCh = promise.Go(func() error {
		return newcli.hijack("POST", "/exec/"+execID+"/start", execConfig.Tty, in, out, stderr, hijacked, execConfig)
	})

	// Acknowledge the hijack before starting
	select {
	case closer := <-hijacked:
		// Make sure that hijack gets closed when returning. (result
		// in closing hijack chan and freeing server‘s goroutines.
		if closer != nil {
			defer closer.Close()
		}
	case err := <-errCh:
		if err != nil {
			logrus.Debugf("Error hijack: %s", err)
			return err
		}
	}

	if execConfig.Tty && newcli.isTerminalIn {
		if err := newcli.monitorTtySize(execID, true); err != nil {
			fmt.Fprintf(newcli.err, "Error monitoring TTY size: %s\n", err)
		}
	}

	if err := <-errCh; err != nil {
		logrus.Debugf("Error hijack: %s", err)
		return err
	}

	var status int
	if _, status, err = getExecExitCode(newcli, execID); err != nil {
		return err
	}

	if status != 0 {
		return newcli.StatusError{StatusCode: status}
	}

	return nil
}

来让我们分析分析最关键的那个newcli.hijack的执行流程

func (cli *DockerCli) hijack(method, path string, setRawTerminal bool, in io.ReadCloser, stdout, stderr io.Writer, started chan io.Closer, data interface{}) error {
	defer func() {
		if started != nil {
			close(started)
		}
	}()

	params, err := cli.encodeData(data)
	if err != nil {
		return err
	}
	req, err := http.NewRequest(method, fmt.Sprintf("%s/v%s%s", cli.basePath, api.Version, path), params)
	if err != nil {
		return err
	}

	// Add CLI Config‘s HTTP Headers BEFORE we set the Docker headers
	// then the user can‘t change OUR headers
	for k, v := range cli.configFile.HTTPHeaders {
		req.Header.Set(k, v)
	}

	//
	req.Header.Set("User-Agent", "Docker-Client/"+dockerversion.VERSION+" ("+runtime.GOOS+")")
	req.Header.Set("Content-Type", "text/plain")
	req.Header.Set("Connection", "Upgrade")
	req.Header.Set("Upgrade", "tcp")
	req.Host = cli.addr

	dial, err := cli.dial() //新建立的连接
	// When we set up a TCP connection for hijack, there could be long periods
	// of inactivity (a long running command with no output) that in certain
	// network setups may cause ECONNTIMEOUT, leaving the client in an unknown
	// state. Setting TCP KeepAlive on the socket connection will prohibit
	// ECONNTIMEOUT unless the socket connection truly is broken
	if tcpConn, ok := dial.(*net.TCPConn); ok {
		tcpConn.SetKeepAlive(true)
		tcpConn.SetKeepAlivePeriod(30 * time.Second)
	}
	if err != nil {
		if strings.Contains(err.Error(), "connection refused") {
			return fmt.Errorf("Cannot connect to the Docker daemon. Is ‘docker daemon‘ running on this host?")
		}
		return err
	}
	clientconn := httputil.NewClientConn(dial, nil)
	defer clientconn.Close()

	// Server hijacks the connection, error ‘connection closed‘ expected
	clientconn.Do(req) //对应Daemon的输出的第二条

	rwc, br := clientconn.Hijack() //clear out the buffer 清理掉缓冲区的数据
	defer rwc.Close()

	if started != nil {
		started <- rwc
	}

	var receiveStdout chan error

	var oldState *term.State
        //是否需要分配伪终端
	if in != nil && setRawTerminal && cli.isTerminalIn && os.Getenv("NORAW") == "" {
		oldState, err = term.SetRawTerminal(cli.inFd)
		if err != nil {
			return err
		}
		defer term.RestoreTerminal(cli.inFd, oldState)
	}

	if stdout != nil || stderr != nil {
		receiveStdout = promise.Go(func() (err error) {
			defer func() {
				if in != nil {
					if setRawTerminal && cli.isTerminalIn {
						term.RestoreTerminal(cli.inFd, oldState)
					}
					// For some reason this Close call blocks on darwin..
					// As the client exists right after, simply discard the close
					// until we find a better solution.
					if runtime.GOOS != "darwin" {
						in.Close()
					}
				}
			}()
                        //IO输出流的重定向在这里
			// When TTY is ON, use regular copy
			if setRawTerminal && stdout != nil {
				_, err = io.Copy(stdout, br)
			} else {
				_, err = stdcopy.StdCopy(stdout, stderr, br)
			}
			logrus.Debugf("[hijack] End of stdout")
			return err
		})
	}

	sendStdin := promise.Go(func() error {
	       //IO输入流的重定向在这里
		if in != nil {
			io.Copy(rwc, in)
			logrus.Debugf("[hijack] End of stdin")
		}

		if conn, ok := rwc.(interface {
			CloseWrite() error
		}); ok {
			if err := conn.CloseWrite(); err != nil {
				logrus.Debugf("Couldn‘t send EOF: %s", err)
			}
		}
		// Discard errors due to pipe interruption
		return nil
	})
        //阻塞在这里
	if stdout != nil || stderr != nil {
		if err := <-receiveStdout; err != nil {
			logrus.Debugf("Error receiveStdout: %s", err)
			return err
		}
	}

	if !cli.isTerminalIn {
		if err := <-sendStdin; err != nil {
			logrus.Debugf("Error sendStdin: %s", err)
			return err
		}
	}
	return nil
}

总结,重定向后,如果不是在后台执行就一直阻塞在那里,直到kill掉该进程

docker exec的具体执行流程

标签:

原文地址:http://my.oschina.net/yang1992/blog/523737

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!