diff --git a/api/agent/agent.go b/api/agent/agent.go index 24486b2e7..c36ac7ac1 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -652,7 +652,8 @@ func (s *hotSlot) dispatch(ctx context.Context, call *call) error { defer span.End() // TODO it's possible we can get rid of this (after getting rid of logs API) - may need for call id/debug mode still - swapBack := s.container.swap(call.stderr, call.stderr, &call.Stats) + // TODO there's a timeout race for swapping this back if the container doesn't get killed for timing out, and don't you forget it + swapBack := s.container.swap(call.stderr, &call.Stats) defer swapBack() resp, err := s.container.udsClient.Do(createUDSRequest(ctx, call)) @@ -1117,7 +1118,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, // container implements drivers.ContainerTask container is the execution of a // single container, which may run multiple functions [consecutively]. the id // and stderr can be swapped out by new calls in the container. input and -// output must be copied in and out. +// output must be copied in and out. stdout is sent to stderr. type container struct { id string // contrived image string @@ -1131,7 +1132,6 @@ type container struct { logCfg drivers.LoggerConfig close func() - stdout io.Writer stderr io.Writer udsClient http.Client @@ -1141,7 +1141,7 @@ type container struct { stats *drivers.Stats } -//newHotContainer creates a container that can be used for multiple sequential events +// newHotContainer creates a container that can be used for multiple sequential events func newHotContainer(ctx context.Context, call *call, cfg *Config, id string, udsWait chan error) *container { var iofs iofs @@ -1161,39 +1161,31 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config, id string, ud inotifyAwait(ctx, iofs.AgentPath(), udsWait) - stderr := common.NewGhostWriter() - stdout := common.NewGhostWriter() - - // for use if no freezer (or we ever make up our minds) - var bufs []*bytes.Buffer - // IMPORTANT: we are not operating on a TTY allocated container. This means, stderr and stdout are multiplexed // from the same stream internally via docker using a multiplexing protocol. Therefore, stderr/stdout *BOTH* // have to be read or *BOTH* blocked consistently. In other words, we cannot block one and continue // reading from the other one without risking head-of-line blocking. - // wrap the syslog and debug loggers in the same (respective) line writer - // stderr -> line writer - - // TODO(reed): there's a bug here where the between writers could have - // bytes in there, get swapped for real stdout/stderr, come back and write - // bytes in and the bytes are [really] stale. I played with fixing this - // and mostly came to the conclusion that life is meaningless. // TODO(reed): we should let the syslog driver pick this up really but our // default story sucks there - buf1 := bufPool.Get().(*bytes.Buffer) - buf2 := bufPool.Get().(*bytes.Buffer) - bufs = []*bytes.Buffer{buf1, buf2} - soc := &nopCloser{&logWriter{ - logrus.WithFields(logrus.Fields{"tag": "stdout", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}), - }} - sec := &nopCloser{&logWriter{ - logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}), - }} + // disable container logs if they're disabled on the call (pure_runner) - + // users may use syslog to get container logs, unrelated to this writer. + // otherwise, make a line writer and allow logrus DEBUG logs to host stderr + // between function invocations from the container. - stdout.Swap(newLineWriterWithBuffer(buf1, soc)) - stderr.Swap(newLineWriterWithBuffer(buf2, sec)) + var bufs []*bytes.Buffer + var stderr io.WriteCloser = call.stderr + if _, ok := stderr.(common.NoopReadWriteCloser); !ok { + gw := common.NewGhostWriter() + buf1 := bufPool.Get().(*bytes.Buffer) + sec := &nopCloser{&logWriter{ + logrus.WithFields(logrus.Fields{"tag": "stderr", "app_id": call.AppID, "fn_id": call.FnID, "image": call.Image, "container_id": id}), + }} + gw.Swap(newLineWriterWithBuffer(buf1, sec)) + stderr = gw + bufs = append(bufs, buf1) + } return &container{ id: id, // XXX we could just let docker generate ids... @@ -1212,7 +1204,6 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config, id string, ud {Name: "fn_id", Value: call.FnID}, }, }, - stdout: stdout, stderr: stderr, udsClient: http.Client{ Transport: &http.Transport{ @@ -1228,7 +1219,6 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config, id string, ud }, close: func() { stderr.Close() - stdout.Close() for _, b := range bufs { bufPool.Put(b) } @@ -1239,19 +1229,22 @@ func newHotContainer(ctx context.Context, call *call, cfg *Config, id string, ud } } -func (c *container) swap(stdout, stderr io.Writer, cs *drivers.Stats) func() { - // if tests don't catch this, then fuck me - ostdout := c.stdout.(common.GhostWriter).Swap(stdout) - ostderr := c.stderr.(common.GhostWriter).Swap(stderr) - +func (c *container) swap(stderr io.Writer, cs *drivers.Stats) func() { + // if they aren't using a ghost writer, the logs are disabled, we can skip swapping + gw, ok := c.stderr.(common.GhostWriter) + var ostderr io.Writer + if ok { + ostderr = gw.Swap(stderr) + } c.swapMu.Lock() ocs := c.stats c.stats = cs c.swapMu.Unlock() return func() { - c.stdout.(common.GhostWriter).Swap(ostdout) - c.stderr.(common.GhostWriter).Swap(ostderr) + if ostderr != nil { + c.stderr.(common.GhostWriter).Swap(ostderr) + } c.swapMu.Lock() c.stats = ocs c.swapMu.Unlock() @@ -1260,8 +1253,8 @@ func (c *container) swap(stdout, stderr io.Writer, cs *drivers.Stats) func() { func (c *container) Id() string { return c.id } func (c *container) Command() string { return "" } -func (c *container) Input() io.Reader { return nil } -func (c *container) Logger() (io.Writer, io.Writer) { return c.stdout, c.stderr } +func (c *container) Input() io.Reader { return common.NoopReadWriteCloser{} } +func (c *container) Logger() (io.Writer, io.Writer) { return c.stderr, c.stderr } func (c *container) Volumes() [][2]string { return nil } func (c *container) WorkDir() string { return "" } func (c *container) Close() { c.close() } diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 1f80a415a..6e15c177f 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -21,6 +21,7 @@ import ( "time" _ "github.com/fnproject/fn/api/agent/drivers/docker" + "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/id" "github.com/fnproject/fn/api/logs" "github.com/fnproject/fn/api/models" @@ -1354,5 +1355,67 @@ func TestCheckSocketDestination(t *testing.T) { } }) } - +} + +func TestContainerDisableIO(t *testing.T) { + modelCall := &models.Call{ + AppID: id.New().String(), + FnID: id.New().String(), + Image: "fnproject/fn-test-utils", + Type: "sync", + Timeout: 1, + IdleTimeout: 2, + } + cfg, err := NewConfig() + if err != nil { + t.Fatalf("bad config %+v", cfg) + } + + ls := logs.NewMock() + a := New(NewDirectCallDataAccess(ls, new(mqs.Mock))) + defer checkClose(t, a) + + // NOTE: right now we disable stdin by default so this test should pass. + // if you're adding back stdin and this fails, that is why. + // NOTE: specify noop as the logger, stdout will get sent to stderr, + // and we should get back a noop writer from the container for both. + callIf, err := a.GetCall(FromModel(modelCall), + WithLogger(common.NoopReadWriteCloser{}), + ) + if err != nil { + t.Fatal(err) + } + call := callIf.(*call) + + ctx := context.TODO() + + errC := make(chan error, 10) + + c := newHotContainer(ctx, call, cfg, id.New().String(), errC) + if c == nil { + err := <-errC + t.Fatal("got unexpected err: ", err) + } + + // we need to test that our concrete container type returns the noop + // writers and readers to the docker driver (ie no decorators), which + // the docker driver currently uses to disable stdin/stdout/stderr at + // the container level (save some bytes) + + stdin := c.Input() + stdout, stderr := c.Logger() + + _, stdinOff := stdin.(common.NoopReadWriteCloser) + _, stdoutOff := stdout.(common.NoopReadWriteCloser) + _, stderrOff := stderr.(common.NoopReadWriteCloser) + + if !stdinOff { + t.Error("stdin is enabled, stdin should be disabled") + } + if !stdoutOff { + t.Error("stdout is enabled, stdout should be disabled") + } + if !stderrOff { + t.Error("stderr is enabled, stderr should be disabled") + } } diff --git a/api/agent/call.go b/api/agent/call.go index b6ce16f68..bb9e8125a 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -253,7 +253,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call) } if c.respWriter == nil { - // send STDOUT to logs if no writer given (async...) + // send function output to logs if no writer given (async...) // TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?) c.respWriter = c.stderr } diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 7f9bf0a5d..4d61c79bb 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -283,19 +283,18 @@ func (drv *DockerDriver) removeContainer(ctx context.Context, container string) // The docker driver will attempt to cast the task to a Auther. If that succeeds, private image support is available. See the Auther interface for how to implement this. func (drv *DockerDriver) run(ctx context.Context, container string, task drivers.ContainerTask) (drivers.WaitResult, error) { - mwOut, mwErr := task.Logger() + stdout, stderr := task.Logger() successChan := make(chan struct{}) _, stdinOff := task.Input().(common.NoopReadWriteCloser) - stdout, stderr := task.Logger() _, stdoutOff := stdout.(common.NoopReadWriteCloser) _, stderrOff := stderr.(common.NoopReadWriteCloser) waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{ Container: container, InputStream: task.Input(), - OutputStream: mwOut, - ErrorStream: mwErr, + OutputStream: stdout, + ErrorStream: stderr, Success: successChan, Stream: true, Stdout: !stdoutOff, diff --git a/api/server/apps_test.go b/api/server/apps_test.go index 586680f22..c9d7fdc57 100644 --- a/api/server/apps_test.go +++ b/api/server/apps_test.go @@ -325,7 +325,6 @@ func TestAppUpdate(t *testing.T) { } if test.expectedError != nil { - fmt.Printf("resp: %s", rec.Body) resp := getErrorResponse(t, rec) if !strings.Contains(resp.Message, test.expectedError.Error()) {