diff --git a/api/agent/call.go b/api/agent/call.go index 1b8bc5463..16ae89a29 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -172,6 +172,14 @@ func WithWriter(w io.Writer) CallOpt { } } +// WithLogger sets stderr to the provided one +func WithLogger(w io.ReadWriteCloser) CallOpt { + return func(c *call) error { + c.stderr = w + return nil + } +} + // InvokeDetached mark a call to be a detached call func InvokeDetached() CallOpt { return func(c *call) error { @@ -198,8 +206,6 @@ func WithExtensions(extensions map[string]string) CallOpt { } // GetCall builds a Call that can be used to submit jobs to the agent. -// -// TODO where to put this? async and sync both call this func (a *agent) GetCall(opts ...CallOpt) (Call, error) { var c call @@ -241,8 +247,11 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { c.handler = a.da c.ct = a - // TODO(reed): is line writer is vulnerable to attack? - c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call) + if c.stderr == nil { + // TODO(reed): is line writer is vulnerable to attack? + // XXX(reed): forcing this as default is not great / configuring it isn't great either. reconsider. + c.stderr = setupLogger(c.req.Context(), a.cfg.MaxLogSize, !a.cfg.DisableDebugUserLogs, c.Call) + } if c.w == nil { // send STDOUT to logs if no writer given (async...) // TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?) diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 6fe34a7ec..7f9bf0a5d 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -219,18 +219,20 @@ func (drv *DockerDriver) CreateCookie(ctx context.Context, task drivers.Containe ctx, log := common.LoggerWithFields(ctx, logrus.Fields{"stack": "CreateCookie"}) - stdinOn := task.Input() != nil - // XXX(reed): we can do same with stderr/stdout + _, stdinOff := task.Input().(common.NoopReadWriteCloser) + stdout, stderr := task.Logger() + _, stdoutOff := stdout.(common.NoopReadWriteCloser) + _, stderrOff := stderr.(common.NoopReadWriteCloser) opts := docker.CreateContainerOptions{ Name: task.Id(), Config: &docker.Config{ Image: task.Image(), - OpenStdin: stdinOn, - StdinOnce: stdinOn, - AttachStdin: stdinOn, - AttachStdout: true, - AttachStderr: true, + OpenStdin: !stdinOff, + StdinOnce: !stdinOff, + AttachStdin: !stdinOff, + AttachStdout: !stdoutOff, + AttachStderr: !stderrOff, }, HostConfig: &docker.HostConfig{ ReadonlyRootfs: drv.conf.EnableReadOnlyRootFs, @@ -284,6 +286,11 @@ func (drv *DockerDriver) run(ctx context.Context, container string, task drivers mwOut, mwErr := task.Logger() successChan := make(chan struct{}) + _, stdinOff := task.Input().(common.NoopReadWriteCloser) + stdout, stderr := task.Logger() + _, stdoutOff := stdout.(common.NoopReadWriteCloser) + _, stderrOff := stderr.(common.NoopReadWriteCloser) + waiter, err := drv.docker.AttachToContainerNonBlocking(ctx, docker.AttachToContainerOptions{ Container: container, InputStream: task.Input(), @@ -291,9 +298,9 @@ func (drv *DockerDriver) run(ctx context.Context, container string, task drivers ErrorStream: mwErr, Success: successChan, Stream: true, - Stdout: true, - Stderr: true, - Stdin: true, + Stdout: !stdoutOff, + Stderr: !stderrOff, + Stdin: !stdinOff, }) if err == nil { diff --git a/api/agent/func_logger.go b/api/agent/func_logger.go index cb233995b..d99f78796 100644 --- a/api/agent/func_logger.go +++ b/api/agent/func_logger.go @@ -87,20 +87,6 @@ type nopCloser struct { func (n *nopCloser) Close() error { return nil } -type nullReadWriter struct { - io.ReadCloser -} - -func (n nullReadWriter) Close() error { - return nil -} -func (n nullReadWriter) Read(b []byte) (int, error) { - return 0, io.EOF -} -func (n nullReadWriter) Write(b []byte) (int, error) { - return len(b), io.EOF -} - // multiWriteCloser ignores all errors from inner writers. you say, oh, this is a bad idea? // yes, well, we were going to silence them all individually anyway, so let's not be shy about it. // the main thing we need to ensure is that every close is called, even if another errors. diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index 4d9657607..84b18c4ea 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -156,7 +156,7 @@ func (a *lbAgent) GetCall(opts ...CallOpt) (Call, error) { c.handler = a.cda c.ct = a - c.stderr = &nullReadWriter{} + c.stderr = common.NoopReadWriteCloser{} c.slotHashId = getSlotQueueKey(&c) return &c, nil } diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 720628346..b01a56de8 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -649,7 +649,8 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error c.StartedAt = common.DateTime(time.Time{}) c.CompletedAt = common.DateTime(time.Time{}) - agent_call, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), + agentCall, err := pr.a.GetCall(FromModelAndInput(&c, state.pipeToFnR), + WithLogger(common.NoopReadWriteCloser{}), WithWriter(state), WithContext(state.ctx), WithExtensions(tc.GetExtensions()), @@ -659,14 +660,14 @@ func (pr *pureRunner) handleTryCall(tc *runner.TryCall, state *callHandle) error return err } - state.c = agent_call.(*call) + state.c = agentCall.(*call) if tc.SlotHashId != "" { - hashId, err := hex.DecodeString(tc.SlotHashId) + hashID, err := hex.DecodeString(tc.SlotHashId) if err != nil { state.enqueueCallResponse(err) return err } - state.c.slotHashId = string(hashId[:]) + state.c.slotHashId = string(hashID[:]) } if state.c.Type == models.TypeDetached { @@ -784,14 +785,15 @@ func (pr *pureRunner) runStatusCall(ctx context.Context) *runner.RunnerStatus { recorder := httptest.NewRecorder() player := ioutil.NopCloser(strings.NewReader(c.Payload)) - agent_call, err := pr.a.GetCall(FromModelAndInput(&c, player), + agentCall, err := pr.a.GetCall(FromModelAndInput(&c, player), + WithLogger(common.NoopReadWriteCloser{}), WithWriter(recorder), WithContext(execCtx), ) if err == nil { var mcall *call - mcall = agent_call.(*call) + mcall = agentCall.(*call) err = pr.a.Submit(mcall) } diff --git a/api/common/io_utils.go b/api/common/io_utils.go index 10dafcb05..001b003c6 100644 --- a/api/common/io_utils.go +++ b/api/common/io_utils.go @@ -5,6 +5,20 @@ import ( "sync" ) +// NoopReadWriteCloser implements io.ReadWriteCloser, discarding all bytes, Read always returns EOF +type NoopReadWriteCloser struct{} + +var _ io.ReadWriteCloser = NoopReadWriteCloser{} + +// Read implements io.Reader +func (n NoopReadWriteCloser) Read(b []byte) (int, error) { return 0, io.EOF } + +// Write implements io.Writer +func (n NoopReadWriteCloser) Write(b []byte) (int, error) { return len(b), nil } + +// Close implements io.Closer +func (n NoopReadWriteCloser) Close() error { return nil } + type clampWriter struct { w io.Writer remaining int64