diff --git a/api/agent/agent.go b/api/agent/agent.go index d98067045..e516b0bcc 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -794,6 +794,10 @@ func NewHotContainer(call *call, isBlockIdleIO bool) (*container, func()) { // when not processing a request, do we block IO? if !isBlockIdleIO { + // IMPORTANT: we are not operating on a TTY allocated container. This means, stderr and stdout are multiplexed + // from the same stream internally via docker using a multiplexing protocol. Therefore, stderr/stdout *BOTH* + // have to be read or *BOTH* blocked consistently. In other words, we cannot block one and continue + // reading from the other one without risking head-of-line blocking. stderr.Swap(newLineWriter(&logWriter{ logrus.WithFields(logrus.Fields{"tag": "stderr", "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": id}), })) diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 166467e0e..247fcbf8b 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -333,7 +333,7 @@ func TestLoggerIsStringerAndWorks(t *testing.T) { // TODO test limit writer, logrus writer, etc etc loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true}) - logger := setupLogger(loggyloo) + logger := setupLogger(loggyloo, 1*1024*1024) if _, ok := logger.(fmt.Stringer); !ok { // NOTE: if you are reading, maybe what you've done is ok, but be aware we were relying on this for optimization... @@ -354,6 +354,40 @@ func TestLoggerIsStringerAndWorks(t *testing.T) { // TODO we could check for the toilet to flush here to logrus } +func TestLoggerTooBig(t *testing.T) { + + loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true}) + logger := setupLogger(loggyloo, 10) + + str := fmt.Sprintf("0 line\n1 l\n-----max log size 10 bytes exceeded, truncating log-----\n") + + n, err := logger.Write([]byte(str)) + if err != nil { + t.Fatalf("err returned, but should not fail err=%v n=%d", err, n) + } + if n != len(str) { + t.Fatalf("n should be %d, but got=%d", len(str), n) + } + + // oneeeeee moreeee time... (cue in Daft Punk), the results appear as if we wrote + // again... But only "limit" bytes should succeed, ignoring the subsequent writes... + n, err = logger.Write([]byte(str)) + if err != nil { + t.Fatalf("err returned, but should not fail err=%v n=%d", err, n) + } + if n != len(str) { + t.Fatalf("n should be %d, but got=%d", len(str), n) + } + + strGot := logger.(fmt.Stringer).String() + + if strGot != str { + t.Fatalf("logs did not match expectations, like being an adult got=\n%v\nexpected=\n%v\n", strGot, str) + } + + logger.Close() +} + func TestSubmitError(t *testing.T) { appName := "myapp" path := "/" diff --git a/api/agent/call.go b/api/agent/call.go index 41408d24e..2edbc3f1d 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -240,7 +240,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { // setup stderr logger separate (don't inherit ctx vars) logger := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": c.AppName, "path": c.Path, "image": c.Image, "call_id": c.ID}) - c.stderr = setupLogger(logger) + c.stderr = setupLogger(logger, a.cfg.MaxLogSize) if c.w == nil { // send STDOUT to logs if no writer given (async...) // TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?) diff --git a/api/agent/config.go b/api/agent/config.go index 2a4f2a049..a5730fa6b 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -2,6 +2,7 @@ package agent import ( "errors" + "fmt" "math" "os" "strconv" @@ -13,6 +14,7 @@ type AgentConfig struct { FreezeIdleMsecs time.Duration `json:"freeze_idle_msecs"` EjectIdleMsecs time.Duration `json:"eject_idle_msecs"` MaxResponseSize uint64 `json:"max_response_size"` + MaxLogSize uint64 `json:"max_log_size"` } var MaxDisabledMsecs = time.Duration(math.MaxInt64) @@ -23,6 +25,7 @@ func NewAgentConfig() (*AgentConfig, error) { cfg := &AgentConfig{ MinDockerVersion: "17.06.0-ce", + MaxLogSize: 1 * 1024 * 1024, } cfg.FreezeIdleMsecs, err = getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond) @@ -30,6 +33,17 @@ func NewAgentConfig() (*AgentConfig, error) { return cfg, errors.New("error initializing freeze idle delay") } + if tmp := os.Getenv("FN_MAX_LOG_SIZE"); tmp != "" { + cfg.MaxLogSize, err = strconv.ParseUint(tmp, 10, 64) + if err != nil { + return cfg, errors.New("error initializing max log size") + } + // for safety during uint64 to int conversions in Write()/Read(), etc. + if cfg.MaxLogSize > math.MaxInt32 { + return cfg, fmt.Errorf("error invalid max log size %v > %v", cfg.MaxLogSize, math.MaxInt32) + } + } + cfg.EjectIdleMsecs, err = getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond) if err != nil { return cfg, errors.New("error initializing eject idle delay") @@ -39,14 +53,11 @@ func NewAgentConfig() (*AgentConfig, error) { return cfg, errors.New("error eject idle delay cannot be zero") } - if size := os.Getenv("FN_MAX_RESPONSE_SIZE"); size != "" { - cfg.MaxResponseSize, err = strconv.ParseUint(size, 10, 64) + if tmp := os.Getenv("FN_MAX_RESPONSE_SIZE"); tmp != "" { + cfg.MaxResponseSize, err = strconv.ParseUint(tmp, 10, 64) if err != nil { return cfg, errors.New("error initializing response buffer size") } - if cfg.MaxResponseSize < 0 { - return cfg, errors.New("error invalid response buffer size") - } } return cfg, nil diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 4a79a5a0f..03ef61727 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -145,6 +145,8 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask Image: task.Image(), Volumes: map[string]struct{}{}, OpenStdin: true, + AttachStdout: true, + AttachStderr: true, AttachStdin: true, StdinOnce: true, }, diff --git a/api/agent/func_logger.go b/api/agent/func_logger.go index 1002492b6..4c2e11dff 100644 --- a/api/agent/func_logger.go +++ b/api/agent/func_logger.go @@ -2,7 +2,6 @@ package agent import ( "bytes" - "errors" "fmt" "io" "sync" @@ -18,7 +17,7 @@ var ( // setupLogger returns an io.ReadWriteCloser which may write to multiple io.Writer's, // and may be read from the returned io.Reader (singular). After Close is called, // the Reader is not safe to read from, nor the Writer to write to. -func setupLogger(logger logrus.FieldLogger) io.ReadWriteCloser { +func setupLogger(logger logrus.FieldLogger, maxSize uint64) io.ReadWriteCloser { lbuf := bufPool.Get().(*bytes.Buffer) dbuf := logPool.Get().(*bytes.Buffer) @@ -34,10 +33,8 @@ func setupLogger(logger logrus.FieldLogger) io.ReadWriteCloser { // we don't need to limit the log writer, but we do need it to dispense lines linew := newLineWriterWithBuffer(lbuf, &logWriter{logger}) - const MB = 1 * 1024 * 1024 // pick a number any number.. TODO configurable ? - // we don't need to log per line to db, but we do need to limit it - limitw := &nopCloser{newLimitWriter(MB, dbuf)} + limitw := &nopCloser{newLimitWriter(int(maxSize), dbuf)} // TODO / NOTE: we want linew to be first because limitw may error if limit // is reached but we still want to log. we should probably ignore hitting the @@ -170,28 +167,38 @@ func (li *lineWriter) Close() error { // io.Writer that allows limiting bytes written to w // TODO change to use clamp writer, this is dupe code -type limitWriter struct { +type limitDiscardWriter struct { n, max int io.Writer } func newLimitWriter(max int, w io.Writer) io.Writer { - return &limitWriter{max: max, Writer: w} + return &limitDiscardWriter{max: max, Writer: w} } -func (l *limitWriter) Write(b []byte) (int, error) { +func (l *limitDiscardWriter) Write(b []byte) (int, error) { + inpLen := len(b) if l.n >= l.max { - return 0, errors.New("max log size exceeded, truncating log") + return inpLen, nil } - if l.n+len(b) >= l.max { + + if l.n+inpLen >= l.max { // cut off to prevent gigantic line attack b = b[:l.max-l.n] } + n, err := l.Writer.Write(b) l.n += n + if l.n >= l.max { // write in truncation message to log once l.Writer.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n", l.max))) + } else if n != len(b) { + // Is this truly a partial write? We'll be honest if that's the case. + return n, err } - return n, err + + // yes, we lie... this is to prevent callers to blow up, we always pretend + // that we were able to write the entire buffer. + return inpLen, err } diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 7ba57de0c..ed2c43ef5 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -146,16 +146,17 @@ func TestRouteRunnerPost(t *testing.T) { } } -// removing this flappy test pending on resolution of -func skipTestRouteRunnerIOPipes(t *testing.T) { +func TestRouteRunnerIOPipes(t *testing.T) { buf := setLogBuffer() isFailure := false // let's make freezer immediate, so that we don't deal with // more timing related issues below. Slightly gains us a bit more // determinism. - tweaker := envTweaker("FN_FREEZE_IDLE_MSECS", "0") - defer tweaker() + tweaker1 := envTweaker("FN_FREEZE_IDLE_MSECS", "0") + tweaker2 := envTweaker("FN_MAX_LOG_SIZE", "5") + defer tweaker1() + defer tweaker2() // Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.) // for figuring out why things failed. @@ -188,8 +189,6 @@ func skipTestRouteRunnerIOPipes(t *testing.T) { delayedGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 1000}` ok := `{"isDebug": true}` - //multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"} - containerIds := make([]string, 0) for i, test := range []struct { @@ -276,7 +275,6 @@ func skipTestRouteRunnerIOPipes(t *testing.T) { t.Logf("Test %d: dockerId: %v", i, containerIds[i]) time.Sleep(test.sleepAmount) } - jsonIds := containerIds[0:4] // now cross check JSON container ids: