标签:
首先,做一个docker exec的请求:
docker exec -it 5504f937f7bb sh
对应的docker -d(启动的docker daemon)的输出为:
INFO[0211] POST /v1.20/containers/5504f937f7bb/exec INFO[0211] POST /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/start INFO[0211] POST /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/resize?h=69&w=236 //exit退出docker exec启动的tty后 2015/10/29 00:47:51 http: response.WriteHeader on hijacked connection INFO[0294] GET /v1.20/exec/fc9c11ae6ac4827ea507e885c888bdb37c8f7b906347b9272adf8d580a6417df/json
我自己创建了一个docker exec的main函数,模拟docker exec的客户端执行流程(入口执行):
package main
import (
_"fmt"
"github.com/docker/docker/api/client"
)
func main() {
client.DoEexc()
}然后是模拟docker exec请求的过程,自己重新写了一个,跳过flag模块对参数的解释,直接上静态的参数
package client
import (
"encoding/json"
"fmt"
"io"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/cli"
flag "github.com/docker/docker/pkg/mflag"
"github.com/docker/docker/pkg/promise"
"github.com/docker/docker/pkg/term"
_ "github.com/docker/docker/runconfig"
)
type ExecConfig struct {
User string // User that will run the command
Privileged bool // Is the container in privileged mode
Tty bool // Attach standard streams to a tty.
Container string // Name of the container (to execute in)
AttachStdin bool // Attach the standard input, makes possible user interaction
AttachStderr bool // Attach the standard output
AttachStdout bool // Attach the standard error
Detach bool // Execute in detach mode
Cmd []string // Execution commands and args
}
var commonFlags = &cli.CommonFlags{FlagSet: new(flag.FlagSet)}
var clientFlags = &cli.ClientFlags{FlagSet: new(flag.FlagSet), Common: commonFlags}
func DoEexc() {
stdin, stdout, stderr := term.StdStreams()
newcli := NewDockerCli(stdin, stdout, stderr, clientFlags)
if err := newcli.init(); err != nil {
fmt.Printf("docker cli init err %s \n", err)
}
CmdExec(newcli)
}
func CmdExec(newcli *DockerCli) error {
execConfig := &ExecConfig{
User: "",
Privileged: false,
Tty: true,
Cmd: []string{"sh"},
Container: "5504f937f7bb",
Detach: false,
AttachStdin: true,
AttachStderr: true,
AttachStdout: true,
}
//对应daemon的第一个输出
serverResp, err := newcli.call("POST", "/containers/"+execConfig.Container+"/exec", execConfig, nil)
if err != nil {
return err
}
defer serverResp.body.Close()
var response types.ContainerExecCreateResponse
if err := json.NewDecoder(serverResp.body).Decode(&response); err != nil {
return err
}
execID := response.ID
//Daemon会分配一个execID
fmt.Printf("response id %d \n", execID)
if execID == "" {
fmt.Fprintf(newcli.out, "exec ID empty")
return nil
}
//Temp struct for execStart so that we don‘t need to transfer all the execConfig
execStartCheck := &types.ExecStartCheck{
Detach: execConfig.Detach,
Tty: execConfig.Tty,
}
if !execConfig.Detach {
if err := newcli.CheckTtyInput(execConfig.AttachStdin, execConfig.Tty); err != nil {
return err
}
} else {
if _, _, err := readBody(newcli.call("POST", "/exec/"+execID+"/start", execStartCheck, nil)); err != nil {
return err
}
// For now don‘t print this - wait for when we support exec wait()
// fmt.Fprintf(cli.out, "%s\n", execID)
return nil
}
// 这个地方就是IO交互的处理流程,就是绑定tty,输出流重定向的过程
var (
out, stderr io.Writer
in io.ReadCloser
hijacked = make(chan io.Closer)
errCh chan error
)
// Block the return until the chan gets closed
defer func() {
logrus.Debugf("End of CmdExec(), Waiting for hijack to finish.")
if _, ok := <-hijacked; ok {
fmt.Fprintln(newcli.err, "Hijack did not finish (chan still open)")
}
}()
if execConfig.AttachStdin {
in = newcli.in
}
if execConfig.AttachStdout {
out = newcli.out
}
if execConfig.AttachStderr {
if execConfig.Tty {
stderr = newcli.out
} else {
stderr = newcli.err
}
}
//这个就是最关键的执行函数
errCh = promise.Go(func() error {
return newcli.hijack("POST", "/exec/"+execID+"/start", execConfig.Tty, in, out, stderr, hijacked, execConfig)
})
// Acknowledge the hijack before starting
select {
case closer := <-hijacked:
// Make sure that hijack gets closed when returning. (result
// in closing hijack chan and freeing server‘s goroutines.
if closer != nil {
defer closer.Close()
}
case err := <-errCh:
if err != nil {
logrus.Debugf("Error hijack: %s", err)
return err
}
}
if execConfig.Tty && newcli.isTerminalIn {
if err := newcli.monitorTtySize(execID, true); err != nil {
fmt.Fprintf(newcli.err, "Error monitoring TTY size: %s\n", err)
}
}
if err := <-errCh; err != nil {
logrus.Debugf("Error hijack: %s", err)
return err
}
var status int
if _, status, err = getExecExitCode(newcli, execID); err != nil {
return err
}
if status != 0 {
return newcli.StatusError{StatusCode: status}
}
return nil
}来让我们分析分析最关键的那个newcli.hijack的执行流程
func (cli *DockerCli) hijack(method, path string, setRawTerminal bool, in io.ReadCloser, stdout, stderr io.Writer, started chan io.Closer, data interface{}) error {
defer func() {
if started != nil {
close(started)
}
}()
params, err := cli.encodeData(data)
if err != nil {
return err
}
req, err := http.NewRequest(method, fmt.Sprintf("%s/v%s%s", cli.basePath, api.Version, path), params)
if err != nil {
return err
}
// Add CLI Config‘s HTTP Headers BEFORE we set the Docker headers
// then the user can‘t change OUR headers
for k, v := range cli.configFile.HTTPHeaders {
req.Header.Set(k, v)
}
//
req.Header.Set("User-Agent", "Docker-Client/"+dockerversion.VERSION+" ("+runtime.GOOS+")")
req.Header.Set("Content-Type", "text/plain")
req.Header.Set("Connection", "Upgrade")
req.Header.Set("Upgrade", "tcp")
req.Host = cli.addr
dial, err := cli.dial() //新建立的连接
// When we set up a TCP connection for hijack, there could be long periods
// of inactivity (a long running command with no output) that in certain
// network setups may cause ECONNTIMEOUT, leaving the client in an unknown
// state. Setting TCP KeepAlive on the socket connection will prohibit
// ECONNTIMEOUT unless the socket connection truly is broken
if tcpConn, ok := dial.(*net.TCPConn); ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(30 * time.Second)
}
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
return fmt.Errorf("Cannot connect to the Docker daemon. Is ‘docker daemon‘ running on this host?")
}
return err
}
clientconn := httputil.NewClientConn(dial, nil)
defer clientconn.Close()
// Server hijacks the connection, error ‘connection closed‘ expected
clientconn.Do(req) //对应Daemon的输出的第二条
rwc, br := clientconn.Hijack() //clear out the buffer 清理掉缓冲区的数据
defer rwc.Close()
if started != nil {
started <- rwc
}
var receiveStdout chan error
var oldState *term.State
//是否需要分配伪终端
if in != nil && setRawTerminal && cli.isTerminalIn && os.Getenv("NORAW") == "" {
oldState, err = term.SetRawTerminal(cli.inFd)
if err != nil {
return err
}
defer term.RestoreTerminal(cli.inFd, oldState)
}
if stdout != nil || stderr != nil {
receiveStdout = promise.Go(func() (err error) {
defer func() {
if in != nil {
if setRawTerminal && cli.isTerminalIn {
term.RestoreTerminal(cli.inFd, oldState)
}
// For some reason this Close call blocks on darwin..
// As the client exists right after, simply discard the close
// until we find a better solution.
if runtime.GOOS != "darwin" {
in.Close()
}
}
}()
//IO输出流的重定向在这里
// When TTY is ON, use regular copy
if setRawTerminal && stdout != nil {
_, err = io.Copy(stdout, br)
} else {
_, err = stdcopy.StdCopy(stdout, stderr, br)
}
logrus.Debugf("[hijack] End of stdout")
return err
})
}
sendStdin := promise.Go(func() error {
//IO输入流的重定向在这里
if in != nil {
io.Copy(rwc, in)
logrus.Debugf("[hijack] End of stdin")
}
if conn, ok := rwc.(interface {
CloseWrite() error
}); ok {
if err := conn.CloseWrite(); err != nil {
logrus.Debugf("Couldn‘t send EOF: %s", err)
}
}
// Discard errors due to pipe interruption
return nil
})
//阻塞在这里
if stdout != nil || stderr != nil {
if err := <-receiveStdout; err != nil {
logrus.Debugf("Error receiveStdout: %s", err)
return err
}
}
if !cli.isTerminalIn {
if err := <-sendStdin; err != nil {
logrus.Debugf("Error sendStdin: %s", err)
return err
}
}
return nil
}总结,重定向后,如果不是在后台执行就一直阻塞在那里,直到kill掉该进程
标签:
原文地址:http://my.oschina.net/yang1992/blog/523737