标签:
首先,做一个docker exec的请求:
docker exec -it 5504f937f7bb sh
对应的docker -d(启动的docker daemon)的输出为:
INFO[0211] POST /v1.20/containers/5504f937f7bb/exec INFO[0211] POST /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/start INFO[0211] POST /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/resize?h=69&w=236 //exit退出docker exec启动的tty后 2015/10/29 00:47:51 http: response.WriteHeader on hijacked connection INFO[0294] GET /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/json
我自己创建了一个docker exec的main函数,模拟docker exec的客户端执行流程(入口执行):
package main import ( _"fmt" "github.com/docker/docker/api/client" ) func main() { client.DoEexc() }
然后是模拟docker exec请求的过程,自己重新写了一个,跳过flag模块对参数的解释,直接上静态的参数
package client import ( "encoding/json" "fmt" "io" "github.com/Sirupsen/logrus" "github.com/docker/docker/api/types" "github.com/docker/docker/cli" flag "github.com/docker/docker/pkg/mflag" "github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/term" _ "github.com/docker/docker/runconfig" ) type ExecConfig struct { User string // User that will run the command Privileged bool // Is the container in privileged mode Tty bool // Attach standard streams to a tty. Container string // Name of the container (to execute in) AttachStdin bool // Attach the standard input, makes possible user interaction AttachStderr bool // Attach the standard output AttachStdout bool // Attach the standard error Detach bool // Execute in detach mode Cmd []string // Execution commands and args } var commonFlags = &cli.CommonFlags{FlagSet: new(flag.FlagSet)} var clientFlags = &cli.ClientFlags{FlagSet: new(flag.FlagSet), Common: commonFlags} func DoEexc() { stdin, stdout, stderr := term.StdStreams() newcli := NewDockerCli(stdin, stdout, stderr, clientFlags) if err := newcli.init(); err != nil { fmt.Printf("docker cli init err %s \n", err) } CmdExec(newcli) } func CmdExec(newcli *DockerCli) error { execConfig := &ExecConfig{ User: "", Privileged: false, Tty: true, Cmd: []string{"sh"}, Container: "5504f937f7bb", Detach: false, AttachStdin: true, AttachStderr: true, AttachStdout: true, } //对应daemon的第一个输出 serverResp, err := newcli.call("POST", "/containers/"+execConfig.Container+"/exec", execConfig, nil) if err != nil { return err } defer serverResp.body.Close() var response types.ContainerExecCreateResponse if err := json.NewDecoder(serverResp.body).Decode(&response); err != nil { return err } execID := response.ID //Daemon会分配一个execID fmt.Printf("response id %d \n", execID) if execID == "" { fmt.Fprintf(newcli.out, "exec ID empty") return nil } //Temp struct for execStart so that we don‘t need to transfer all the execConfig execStartCheck := &types.ExecStartCheck{ Detach: execConfig.Detach, Tty: execConfig.Tty, } if !execConfig.Detach { if err := newcli.CheckTtyInput(execConfig.AttachStdin, execConfig.Tty); err != nil { return err } } else { if _, _, err := readBody(newcli.call("POST", "/exec/"+execID+"/start", execStartCheck, nil)); err != nil { return err } // For now don‘t print this - wait for when we support exec wait() // fmt.Fprintf(cli.out, "%s\n", execID) return nil } // 这个地方就是IO交互的处理流程,就是绑定tty,输出流重定向的过程 var ( out, stderr io.Writer in io.ReadCloser hijacked = make(chan io.Closer) errCh chan error ) // Block the return until the chan gets closed defer func() { logrus.Debugf("End of CmdExec(), Waiting for hijack to finish.") if _, ok := <-hijacked; ok { fmt.Fprintln(newcli.err, "Hijack did not finish (chan still open)") } }() if execConfig.AttachStdin { in = newcli.in } if execConfig.AttachStdout { out = newcli.out } if execConfig.AttachStderr { if execConfig.Tty { stderr = newcli.out } else { stderr = newcli.err } } //这个就是最关键的执行函数 errCh = promise.Go(func() error { return newcli.hijack("POST", "/exec/"+execID+"/start", execConfig.Tty, in, out, stderr, hijacked, execConfig) }) // Acknowledge the hijack before starting select { case closer := <-hijacked: // Make sure that hijack gets closed when returning. (result // in closing hijack chan and freeing server‘s goroutines. if closer != nil { defer closer.Close() } case err := <-errCh: if err != nil { logrus.Debugf("Error hijack: %s", err) return err } } if execConfig.Tty && newcli.isTerminalIn { if err := newcli.monitorTtySize(execID, true); err != nil { fmt.Fprintf(newcli.err, "Error monitoring TTY size: %s\n", err) } } if err := <-errCh; err != nil { logrus.Debugf("Error hijack: %s", err) return err } var status int if _, status, err = getExecExitCode(newcli, execID); err != nil { return err } if status != 0 { return newcli.StatusError{StatusCode: status} } return nil }
来让我们分析分析最关键的那个newcli.hijack的执行流程
func (cli *DockerCli) hijack(method, path string, setRawTerminal bool, in io.ReadCloser, stdout, stderr io.Writer, started chan io.Closer, data interface{}) error { defer func() { if started != nil { close(started) } }() params, err := cli.encodeData(data) if err != nil { return err } req, err := http.NewRequest(method, fmt.Sprintf("%s/v%s%s", cli.basePath, api.Version, path), params) if err != nil { return err } // Add CLI Config‘s HTTP Headers BEFORE we set the Docker headers // then the user can‘t change OUR headers for k, v := range cli.configFile.HTTPHeaders { req.Header.Set(k, v) } // req.Header.Set("User-Agent", "Docker-Client/"+dockerversion.VERSION+" ("+runtime.GOOS+")") req.Header.Set("Content-Type", "text/plain") req.Header.Set("Connection", "Upgrade") req.Header.Set("Upgrade", "tcp") req.Host = cli.addr dial, err := cli.dial() //新建立的连接 // When we set up a TCP connection for hijack, there could be long periods // of inactivity (a long running command with no output) that in certain // network setups may cause ECONNTIMEOUT, leaving the client in an unknown // state. Setting TCP KeepAlive on the socket connection will prohibit // ECONNTIMEOUT unless the socket connection truly is broken if tcpConn, ok := dial.(*net.TCPConn); ok { tcpConn.SetKeepAlive(true) tcpConn.SetKeepAlivePeriod(30 * time.Second) } if err != nil { if strings.Contains(err.Error(), "connection refused") { return fmt.Errorf("Cannot connect to the Docker daemon. Is ‘docker daemon‘ running on this host?") } return err } clientconn := httputil.NewClientConn(dial, nil) defer clientconn.Close() // Server hijacks the connection, error ‘connection closed‘ expected clientconn.Do(req) //对应Daemon的输出的第二条 rwc, br := clientconn.Hijack() //clear out the buffer 清理掉缓冲区的数据 defer rwc.Close() if started != nil { started <- rwc } var receiveStdout chan error var oldState *term.State //是否需要分配伪终端 if in != nil && setRawTerminal && cli.isTerminalIn && os.Getenv("NORAW") == "" { oldState, err = term.SetRawTerminal(cli.inFd) if err != nil { return err } defer term.RestoreTerminal(cli.inFd, oldState) } if stdout != nil || stderr != nil { receiveStdout = promise.Go(func() (err error) { defer func() { if in != nil { if setRawTerminal && cli.isTerminalIn { term.RestoreTerminal(cli.inFd, oldState) } // For some reason this Close call blocks on darwin.. // As the client exists right after, simply discard the close // until we find a better solution. if runtime.GOOS != "darwin" { in.Close() } } }() //IO输出流的重定向在这里 // When TTY is ON, use regular copy if setRawTerminal && stdout != nil { _, err = io.Copy(stdout, br) } else { _, err = stdcopy.StdCopy(stdout, stderr, br) } logrus.Debugf("[hijack] End of stdout") return err }) } sendStdin := promise.Go(func() error { //IO输入流的重定向在这里 if in != nil { io.Copy(rwc, in) logrus.Debugf("[hijack] End of stdin") } if conn, ok := rwc.(interface { CloseWrite() error }); ok { if err := conn.CloseWrite(); err != nil { logrus.Debugf("Couldn‘t send EOF: %s", err) } } // Discard errors due to pipe interruption return nil }) //阻塞在这里 if stdout != nil || stderr != nil { if err := <-receiveStdout; err != nil { logrus.Debugf("Error receiveStdout: %s", err) return err } } if !cli.isTerminalIn { if err := <-sendStdin; err != nil { logrus.Debugf("Error sendStdin: %s", err) return err } } return nil }
总结,重定向后,如果不是在后台执行就一直阻塞在那里,直到kill掉该进程
标签:
原文地址:http://my.oschina.net/yang1992/blog/523737