mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
onto the error checking
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/docker/docker/api/types/container"
|
||||
containertypes "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"github.com/fnproject/fn/api/agent/drivers"
|
||||
driverstats "github.com/fnproject/fn/api/agent/drivers/stats"
|
||||
"github.com/fnproject/fn/api/common"
|
||||
@@ -450,6 +451,7 @@ func (drv *DockerDriver) run(ctx context.Context, task drivers.ContainerTask) (d
|
||||
stdout, stderr := task.Logger()
|
||||
successChan := make(chan struct{})
|
||||
|
||||
// TODO(reed): if all 3 of these are off, we don't need to attach really?
|
||||
_, stdinOff := stdin.(common.NoopReadWriteCloser)
|
||||
_, stdoutOff := stdout.(common.NoopReadWriteCloser)
|
||||
_, stderrOff := stderr.(common.NoopReadWriteCloser)
|
||||
@@ -484,13 +486,10 @@ func (drv *DockerDriver) run(ctx context.Context, task drivers.ContainerTask) (d
|
||||
go func() {
|
||||
cErr <- func() error {
|
||||
streamer := hijackedIOStreamer{
|
||||
streams: dockerCli,
|
||||
inputStream: in,
|
||||
outputStream: dockerCli.Out(),
|
||||
errorStream: dockerCli.Err(),
|
||||
inputStream: stdin,
|
||||
outputStream: stdout,
|
||||
errorStream: stderr,
|
||||
resp: resp,
|
||||
tty: c.Config.Tty,
|
||||
detachKeys: options.DetachKeys,
|
||||
}
|
||||
|
||||
errHijack := streamer.stream(ctx)
|
||||
@@ -502,7 +501,7 @@ func (drv *DockerDriver) run(ctx context.Context, task drivers.ContainerTask) (d
|
||||
}()
|
||||
}()
|
||||
|
||||
err = drv.docker.ContainerStart(ctx, container, types.ContainerStartOptions{})
|
||||
err := drv.docker.ContainerStart(ctx, container, types.ContainerStartOptions{})
|
||||
if err != nil {
|
||||
cancel() // make sure we shut down stats / attach & close body
|
||||
<-cErr
|
||||
@@ -530,8 +529,7 @@ func (drv *DockerDriver) run(ctx context.Context, task drivers.ContainerTask) (d
|
||||
// A hijackedIOStreamer handles copying input to and output from streams to the
|
||||
// connection.
|
||||
type hijackedIOStreamer struct {
|
||||
streams command.Streams
|
||||
inputStream io.ReadCloser
|
||||
inputStream io.Reader
|
||||
outputStream io.Writer
|
||||
errorStream io.Writer
|
||||
|
||||
@@ -546,15 +544,8 @@ type hijackedIOStreamer struct {
|
||||
// output, the user inputs the detach key sequence when in TTY mode, or when
|
||||
// the given context is cancelled.
|
||||
func (h *hijackedIOStreamer) stream(ctx context.Context) error {
|
||||
restoreInput, err := h.setupInput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to setup input stream: %s", err)
|
||||
}
|
||||
|
||||
defer restoreInput()
|
||||
|
||||
outputDone := h.beginOutputStream(restoreInput)
|
||||
inputDone, detached := h.beginInputStream(restoreInput)
|
||||
outputDone := h.beginOutputStream()
|
||||
inputDone := h.beginInputStream()
|
||||
|
||||
select {
|
||||
case err := <-outputDone:
|
||||
@@ -571,72 +562,20 @@ func (h *hijackedIOStreamer) stream(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
return nil
|
||||
case err := <-detached:
|
||||
// Got a detach key sequence.
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *hijackedIOStreamer) setupInput() (restore func(), err error) {
|
||||
if h.inputStream == nil || !h.tty {
|
||||
// No need to setup input TTY.
|
||||
// The restore func is a nop.
|
||||
return func() {}, nil
|
||||
}
|
||||
|
||||
if err := setRawTerminal(h.streams); err != nil {
|
||||
return nil, fmt.Errorf("unable to set IO streams as raw terminal: %s", err)
|
||||
}
|
||||
|
||||
// Use sync.Once so we may call restore multiple times but ensure we
|
||||
// only restore the terminal once.
|
||||
var restoreOnce sync.Once
|
||||
restore = func() {
|
||||
restoreOnce.Do(func() {
|
||||
restoreTerminal(h.streams, h.inputStream)
|
||||
})
|
||||
}
|
||||
|
||||
// Wrap the input to detect detach escape sequence.
|
||||
// Use default escape keys if an invalid sequence is given.
|
||||
escapeKeys := defaultEscapeKeys
|
||||
if h.detachKeys != "" {
|
||||
customEscapeKeys, err := term.ToBytes(h.detachKeys)
|
||||
if err != nil {
|
||||
logrus.Warnf("invalid detach escape keys, using default: %s", err)
|
||||
} else {
|
||||
escapeKeys = customEscapeKeys
|
||||
}
|
||||
}
|
||||
|
||||
h.inputStream = ioutils.NewReadCloserWrapper(term.NewEscapeProxy(h.inputStream, escapeKeys), h.inputStream.Close)
|
||||
|
||||
return restore, nil
|
||||
}
|
||||
|
||||
func (h *hijackedIOStreamer) beginOutputStream(restoreInput func()) <-chan error {
|
||||
func (h *hijackedIOStreamer) beginOutputStream() <-chan error {
|
||||
if h.outputStream == nil && h.errorStream == nil {
|
||||
// There is no need to copy output.
|
||||
return nil
|
||||
}
|
||||
|
||||
outputDone := make(chan error)
|
||||
outputDone := make(chan error, 1)
|
||||
go func() {
|
||||
var err error
|
||||
|
||||
// When TTY is ON, use regular copy
|
||||
if h.outputStream != nil && h.tty {
|
||||
_, err = io.Copy(h.outputStream, h.resp.Reader)
|
||||
// We should restore the terminal as soon as possible
|
||||
// once the connection ends so any following print
|
||||
// messages will be in normal type.
|
||||
restoreInput()
|
||||
} else {
|
||||
_, err = stdcopy.StdCopy(h.outputStream, h.errorStream, h.resp.Reader)
|
||||
}
|
||||
|
||||
_, err := stdcopy.StdCopy(h.outputStream, h.errorStream, h.resp.Reader)
|
||||
logrus.Debug("[hijack] End of stdout")
|
||||
|
||||
if err != nil {
|
||||
@@ -649,25 +588,14 @@ func (h *hijackedIOStreamer) beginOutputStream(restoreInput func()) <-chan error
|
||||
return outputDone
|
||||
}
|
||||
|
||||
func (h *hijackedIOStreamer) beginInputStream(restoreInput func()) (doneC <-chan struct{}, detachedC <-chan error) {
|
||||
func (h *hijackedIOStreamer) beginInputStream() (doneC <-chan struct{}) {
|
||||
inputDone := make(chan struct{})
|
||||
detached := make(chan error)
|
||||
|
||||
go func() {
|
||||
if h.inputStream != nil {
|
||||
_, err := io.Copy(h.resp.Conn, h.inputStream)
|
||||
// We should restore the terminal as soon as possible
|
||||
// once the connection ends so any following print
|
||||
// messages will be in normal type.
|
||||
restoreInput()
|
||||
|
||||
logrus.Debug("[hijack] End of stdin")
|
||||
|
||||
if _, ok := err.(term.EscapeError); ok {
|
||||
detached <- err
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// This error will also occur on the receive
|
||||
// side (from stdout) where it will be
|
||||
@@ -683,36 +611,7 @@ func (h *hijackedIOStreamer) beginInputStream(restoreInput func()) (doneC <-chan
|
||||
close(inputDone)
|
||||
}()
|
||||
|
||||
return inputDone, detached
|
||||
}
|
||||
|
||||
func setRawTerminal(streams command.Streams) error {
|
||||
if err := streams.In().SetRawTerminal(); err != nil {
|
||||
return err
|
||||
}
|
||||
return streams.Out().SetRawTerminal()
|
||||
}
|
||||
|
||||
// nolint: unparam
|
||||
func restoreTerminal(streams command.Streams, in io.Closer) error {
|
||||
streams.In().RestoreTerminal()
|
||||
streams.Out().RestoreTerminal()
|
||||
// WARNING: DO NOT REMOVE THE OS CHECKS !!!
|
||||
// For some reason this Close call blocks on darwin..
|
||||
// As the client exits right after, simply discard the close
|
||||
// until we find a better solution.
|
||||
//
|
||||
// This can also cause the client on Windows to get stuck in Win32 CloseHandle()
|
||||
// in some cases. See https://github.com/docker/docker/issues/28267#issuecomment-288237442
|
||||
// Tracked internally at Microsoft by VSO #11352156. In the
|
||||
// Windows case, you hit this if you are using the native/v2 console,
|
||||
// not the "legacy" console, and you start the client in a new window. eg
|
||||
// `start docker run --rm -it microsoft/nanoserver cmd /s /c echo foobar`
|
||||
// will hang. Remove start, and it won't repro.
|
||||
if in != nil && runtime.GOOS != "darwin" && runtime.GOOS != "windows" {
|
||||
return in.Close()
|
||||
}
|
||||
return nil
|
||||
return inputDone
|
||||
}
|
||||
|
||||
// isSyslogError checks if the error message is what docker syslog plugin returns
|
||||
|
||||
Reference in New Issue
Block a user