fn: enable failing test back (#826)

* fn: enable failing test back

* fn: fortifying the stderr output

Modified limitWriter to discard excess data instead
of returning error, this is to allow stderr/stdout
pipes flowing to avoid head-of-line blocking or
data corruption in container stdout/stderr output stream.
This commit is contained in:
Tolga Ceylan
2018-03-09 09:57:28 -08:00
committed by Reed Allman
parent f85294b0fc
commit 7177bf3923
7 changed files with 81 additions and 25 deletions

View File

@@ -794,6 +794,10 @@ func NewHotContainer(call *call, isBlockIdleIO bool) (*container, func()) {
// when not processing a request, do we block IO? // when not processing a request, do we block IO?
if !isBlockIdleIO { if !isBlockIdleIO {
// IMPORTANT: we are not operating on a TTY allocated container. This means, stderr and stdout are multiplexed
// from the same stream internally via docker using a multiplexing protocol. Therefore, stderr/stdout *BOTH*
// have to be read or *BOTH* blocked consistently. In other words, we cannot block one and continue
// reading from the other one without risking head-of-line blocking.
stderr.Swap(newLineWriter(&logWriter{ stderr.Swap(newLineWriter(&logWriter{
logrus.WithFields(logrus.Fields{"tag": "stderr", "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": id}), logrus.WithFields(logrus.Fields{"tag": "stderr", "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": id}),
})) }))

View File

@@ -333,7 +333,7 @@ func TestLoggerIsStringerAndWorks(t *testing.T) {
// TODO test limit writer, logrus writer, etc etc // TODO test limit writer, logrus writer, etc etc
loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true}) loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true})
logger := setupLogger(loggyloo) logger := setupLogger(loggyloo, 1*1024*1024)
if _, ok := logger.(fmt.Stringer); !ok { if _, ok := logger.(fmt.Stringer); !ok {
// NOTE: if you are reading, maybe what you've done is ok, but be aware we were relying on this for optimization... // NOTE: if you are reading, maybe what you've done is ok, but be aware we were relying on this for optimization...
@@ -354,6 +354,40 @@ func TestLoggerIsStringerAndWorks(t *testing.T) {
// TODO we could check for the toilet to flush here to logrus // TODO we could check for the toilet to flush here to logrus
} }
func TestLoggerTooBig(t *testing.T) {
loggyloo := logrus.WithFields(logrus.Fields{"yodawg": true})
logger := setupLogger(loggyloo, 10)
str := fmt.Sprintf("0 line\n1 l\n-----max log size 10 bytes exceeded, truncating log-----\n")
n, err := logger.Write([]byte(str))
if err != nil {
t.Fatalf("err returned, but should not fail err=%v n=%d", err, n)
}
if n != len(str) {
t.Fatalf("n should be %d, but got=%d", len(str), n)
}
// oneeeeee moreeee time... (cue in Daft Punk), the results appear as if we wrote
// again... But only "limit" bytes should succeed, ignoring the subsequent writes...
n, err = logger.Write([]byte(str))
if err != nil {
t.Fatalf("err returned, but should not fail err=%v n=%d", err, n)
}
if n != len(str) {
t.Fatalf("n should be %d, but got=%d", len(str), n)
}
strGot := logger.(fmt.Stringer).String()
if strGot != str {
t.Fatalf("logs did not match expectations, like being an adult got=\n%v\nexpected=\n%v\n", strGot, str)
}
logger.Close()
}
func TestSubmitError(t *testing.T) { func TestSubmitError(t *testing.T) {
appName := "myapp" appName := "myapp"
path := "/" path := "/"

View File

@@ -240,7 +240,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
// setup stderr logger separate (don't inherit ctx vars) // setup stderr logger separate (don't inherit ctx vars)
logger := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": c.AppName, "path": c.Path, "image": c.Image, "call_id": c.ID}) logger := logrus.WithFields(logrus.Fields{"user_log": true, "app_name": c.AppName, "path": c.Path, "image": c.Image, "call_id": c.ID})
c.stderr = setupLogger(logger) c.stderr = setupLogger(logger, a.cfg.MaxLogSize)
if c.w == nil { if c.w == nil {
// send STDOUT to logs if no writer given (async...) // send STDOUT to logs if no writer given (async...)
// TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?) // TODO we could/should probably make this explicit to GetCall, ala 'WithLogger', but it's dupe code (who cares?)

View File

@@ -2,6 +2,7 @@ package agent
import ( import (
"errors" "errors"
"fmt"
"math" "math"
"os" "os"
"strconv" "strconv"
@@ -13,6 +14,7 @@ type AgentConfig struct {
FreezeIdleMsecs time.Duration `json:"freeze_idle_msecs"` FreezeIdleMsecs time.Duration `json:"freeze_idle_msecs"`
EjectIdleMsecs time.Duration `json:"eject_idle_msecs"` EjectIdleMsecs time.Duration `json:"eject_idle_msecs"`
MaxResponseSize uint64 `json:"max_response_size"` MaxResponseSize uint64 `json:"max_response_size"`
MaxLogSize uint64 `json:"max_log_size"`
} }
var MaxDisabledMsecs = time.Duration(math.MaxInt64) var MaxDisabledMsecs = time.Duration(math.MaxInt64)
@@ -23,6 +25,7 @@ func NewAgentConfig() (*AgentConfig, error) {
cfg := &AgentConfig{ cfg := &AgentConfig{
MinDockerVersion: "17.06.0-ce", MinDockerVersion: "17.06.0-ce",
MaxLogSize: 1 * 1024 * 1024,
} }
cfg.FreezeIdleMsecs, err = getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond) cfg.FreezeIdleMsecs, err = getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond)
@@ -30,6 +33,17 @@ func NewAgentConfig() (*AgentConfig, error) {
return cfg, errors.New("error initializing freeze idle delay") return cfg, errors.New("error initializing freeze idle delay")
} }
if tmp := os.Getenv("FN_MAX_LOG_SIZE"); tmp != "" {
cfg.MaxLogSize, err = strconv.ParseUint(tmp, 10, 64)
if err != nil {
return cfg, errors.New("error initializing max log size")
}
// for safety during uint64 to int conversions in Write()/Read(), etc.
if cfg.MaxLogSize > math.MaxInt32 {
return cfg, fmt.Errorf("error invalid max log size %v > %v", cfg.MaxLogSize, math.MaxInt32)
}
}
cfg.EjectIdleMsecs, err = getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond) cfg.EjectIdleMsecs, err = getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond)
if err != nil { if err != nil {
return cfg, errors.New("error initializing eject idle delay") return cfg, errors.New("error initializing eject idle delay")
@@ -39,14 +53,11 @@ func NewAgentConfig() (*AgentConfig, error) {
return cfg, errors.New("error eject idle delay cannot be zero") return cfg, errors.New("error eject idle delay cannot be zero")
} }
if size := os.Getenv("FN_MAX_RESPONSE_SIZE"); size != "" { if tmp := os.Getenv("FN_MAX_RESPONSE_SIZE"); tmp != "" {
cfg.MaxResponseSize, err = strconv.ParseUint(size, 10, 64) cfg.MaxResponseSize, err = strconv.ParseUint(tmp, 10, 64)
if err != nil { if err != nil {
return cfg, errors.New("error initializing response buffer size") return cfg, errors.New("error initializing response buffer size")
} }
if cfg.MaxResponseSize < 0 {
return cfg, errors.New("error invalid response buffer size")
}
} }
return cfg, nil return cfg, nil

View File

@@ -145,6 +145,8 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
Image: task.Image(), Image: task.Image(),
Volumes: map[string]struct{}{}, Volumes: map[string]struct{}{},
OpenStdin: true, OpenStdin: true,
AttachStdout: true,
AttachStderr: true,
AttachStdin: true, AttachStdin: true,
StdinOnce: true, StdinOnce: true,
}, },

View File

@@ -2,7 +2,6 @@ package agent
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"io" "io"
"sync" "sync"
@@ -18,7 +17,7 @@ var (
// setupLogger returns an io.ReadWriteCloser which may write to multiple io.Writer's, // setupLogger returns an io.ReadWriteCloser which may write to multiple io.Writer's,
// and may be read from the returned io.Reader (singular). After Close is called, // and may be read from the returned io.Reader (singular). After Close is called,
// the Reader is not safe to read from, nor the Writer to write to. // the Reader is not safe to read from, nor the Writer to write to.
func setupLogger(logger logrus.FieldLogger) io.ReadWriteCloser { func setupLogger(logger logrus.FieldLogger, maxSize uint64) io.ReadWriteCloser {
lbuf := bufPool.Get().(*bytes.Buffer) lbuf := bufPool.Get().(*bytes.Buffer)
dbuf := logPool.Get().(*bytes.Buffer) dbuf := logPool.Get().(*bytes.Buffer)
@@ -34,10 +33,8 @@ func setupLogger(logger logrus.FieldLogger) io.ReadWriteCloser {
// we don't need to limit the log writer, but we do need it to dispense lines // we don't need to limit the log writer, but we do need it to dispense lines
linew := newLineWriterWithBuffer(lbuf, &logWriter{logger}) linew := newLineWriterWithBuffer(lbuf, &logWriter{logger})
const MB = 1 * 1024 * 1024 // pick a number any number.. TODO configurable ?
// we don't need to log per line to db, but we do need to limit it // we don't need to log per line to db, but we do need to limit it
limitw := &nopCloser{newLimitWriter(MB, dbuf)} limitw := &nopCloser{newLimitWriter(int(maxSize), dbuf)}
// TODO / NOTE: we want linew to be first because limitw may error if limit // TODO / NOTE: we want linew to be first because limitw may error if limit
// is reached but we still want to log. we should probably ignore hitting the // is reached but we still want to log. we should probably ignore hitting the
@@ -170,28 +167,38 @@ func (li *lineWriter) Close() error {
// io.Writer that allows limiting bytes written to w // io.Writer that allows limiting bytes written to w
// TODO change to use clamp writer, this is dupe code // TODO change to use clamp writer, this is dupe code
type limitWriter struct { type limitDiscardWriter struct {
n, max int n, max int
io.Writer io.Writer
} }
func newLimitWriter(max int, w io.Writer) io.Writer { func newLimitWriter(max int, w io.Writer) io.Writer {
return &limitWriter{max: max, Writer: w} return &limitDiscardWriter{max: max, Writer: w}
} }
func (l *limitWriter) Write(b []byte) (int, error) { func (l *limitDiscardWriter) Write(b []byte) (int, error) {
inpLen := len(b)
if l.n >= l.max { if l.n >= l.max {
return 0, errors.New("max log size exceeded, truncating log") return inpLen, nil
} }
if l.n+len(b) >= l.max {
if l.n+inpLen >= l.max {
// cut off to prevent gigantic line attack // cut off to prevent gigantic line attack
b = b[:l.max-l.n] b = b[:l.max-l.n]
} }
n, err := l.Writer.Write(b) n, err := l.Writer.Write(b)
l.n += n l.n += n
if l.n >= l.max { if l.n >= l.max {
// write in truncation message to log once // write in truncation message to log once
l.Writer.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n", l.max))) l.Writer.Write([]byte(fmt.Sprintf("\n-----max log size %d bytes exceeded, truncating log-----\n", l.max)))
} } else if n != len(b) {
// Is this truly a partial write? We'll be honest if that's the case.
return n, err return n, err
} }
// yes, we lie... this is to prevent callers to blow up, we always pretend
// that we were able to write the entire buffer.
return inpLen, err
}

View File

@@ -146,16 +146,17 @@ func TestRouteRunnerPost(t *testing.T) {
} }
} }
// removing this flappy test pending on resolution of func TestRouteRunnerIOPipes(t *testing.T) {
func skipTestRouteRunnerIOPipes(t *testing.T) {
buf := setLogBuffer() buf := setLogBuffer()
isFailure := false isFailure := false
// let's make freezer immediate, so that we don't deal with // let's make freezer immediate, so that we don't deal with
// more timing related issues below. Slightly gains us a bit more // more timing related issues below. Slightly gains us a bit more
// determinism. // determinism.
tweaker := envTweaker("FN_FREEZE_IDLE_MSECS", "0") tweaker1 := envTweaker("FN_FREEZE_IDLE_MSECS", "0")
defer tweaker() tweaker2 := envTweaker("FN_MAX_LOG_SIZE", "5")
defer tweaker1()
defer tweaker2()
// Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.) // Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.)
// for figuring out why things failed. // for figuring out why things failed.
@@ -188,8 +189,6 @@ func skipTestRouteRunnerIOPipes(t *testing.T) {
delayedGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 1000}` delayedGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 1000}`
ok := `{"isDebug": true}` ok := `{"isDebug": true}`
//multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"}
containerIds := make([]string, 0) containerIds := make([]string, 0)
for i, test := range []struct { for i, test := range []struct {
@@ -276,7 +275,6 @@ func skipTestRouteRunnerIOPipes(t *testing.T) {
t.Logf("Test %d: dockerId: %v", i, containerIds[i]) t.Logf("Test %d: dockerId: %v", i, containerIds[i])
time.Sleep(test.sleepAmount) time.Sleep(test.sleepAmount)
} }
jsonIds := containerIds[0:4] jsonIds := containerIds[0:4]
// now cross check JSON container ids: // now cross check JSON container ids: