From 7677aad450f531ebccc63236f5af5539dd137927 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Wed, 7 Mar 2018 15:09:24 -0800 Subject: [PATCH] fn: I/O related improvements (#809) *) I/O protocol parse issues should shutdown the container as the container goes to inconsistent state between calls. (eg. next call may receive previous calls left overs.) *) Move ghost read/write code into io_utils in common. *) Clean unused error from docker Wait() *) We can catch one case in JSON, if there's remaining unparsed data in decoder buffer, we can shut the container *) stdout/stderr when container is not handling a request are now blocked if freezer is also enabled. *) if a fatal err is set for slot, we do not requeue it and proceed to shutdown *) added a test function for a few cases with freezer strict behavior --- api/agent/agent.go | 218 ++++++------------ api/agent/config.go | 6 +- api/agent/drivers/docker/docker.go | 4 +- api/agent/drivers/docker/docker_test.go | 12 +- api/agent/drivers/driver.go | 2 +- api/agent/drivers/mock/mocker.go | 6 +- api/agent/protocol/factory.go | 2 + api/agent/protocol/json.go | 17 +- api/common/io_utils.go | 159 +++++++++++++ api/server/runner_test.go | 283 ++++++++++++++++++++---- images/fn-test-utils/fn-test-utils.go | 61 ++++- 11 files changed, 568 insertions(+), 202 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 1dcc7dca1..d7ace2daa 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -443,13 +443,13 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { // implements Slot type coldSlot struct { - cookie drivers.Cookie - tok ResourceToken - err error + cookie drivers.Cookie + tok ResourceToken + fatalErr error } func (s *coldSlot) Error() error { - return s.err + return s.fatalErr } func (s *coldSlot) exec(ctx context.Context, call *call) error { @@ -464,10 +464,8 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error { return err } - res, err := waiter.Wait(ctx) - if err != nil { - return err - } else if res.Error() != nil { + res := waiter.Wait(ctx) + if res.Error() != nil { // check for call error (oom/exit) and beam it up return res.Error() } @@ -496,7 +494,7 @@ type hotSlot struct { errC <-chan error // container error container *container // TODO mask this maxRespSize uint64 // TODO boo. - err error + fatalErr error } func (s *hotSlot) Close(ctx context.Context) error { @@ -505,7 +503,7 @@ func (s *hotSlot) Close(ctx context.Context) error { } func (s *hotSlot) Error() error { - return s.err + return s.fatalErr } func (s *hotSlot) exec(ctx context.Context, call *call) error { @@ -542,6 +540,15 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { case err := <-s.errC: // error from container return err case err := <-errApp: // from dispatch + if s.fatalErr == nil && err != nil { + if models.IsAPIError(err) { + s.fatalErr = err + } else if err == protocol.ErrExcessData { + s.fatalErr = err + // suppress excess data error, but do shutdown the container + return nil + } + } return err case <-ctx.Done(): // call timeout return ctx.Err() @@ -604,42 +611,25 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state state.UpdateState(ctx, ContainerStateStart, call.slots) defer state.UpdateState(ctx, ContainerStateDone, call.slots) - cid := id.New().String() - - // set up the stderr to capture any logs before the slot is executed and - // between hot functions - stderr := newLineWriter(&logWriter{ - logrus.WithFields(logrus.Fields{"between_log": true, "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": cid}), - }) - - // between calls we need a reader that doesn't do anything - stdin := &ghostReader{cond: sync.NewCond(new(sync.Mutex)), inner: new(waitReader)} - defer stdin.Close() - - container := &container{ - id: cid, // XXX we could just let docker generate ids... - image: call.Image, - env: map[string]string(call.Config), - memory: call.Memory, - cpus: uint64(call.CPUs), - stdin: stdin, - stdout: &ghostWriter{inner: stderr}, - stderr: &ghostWriter{inner: stderr}, - } + // if freezer is enabled, be consistent with freezer behavior and + // block stdout and stderr between calls. + isBlockIdleIO := MaxDisabledMsecs != a.cfg.FreezeIdleMsecs + container, closer := NewHotContainer(call, isBlockIdleIO) + defer closer() logger := logrus.WithFields(logrus.Fields{"id": container.id, "app": call.AppName, "route": call.Path, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "format": call.Format, "idle_timeout": call.IdleTimeout}) ctx = common.WithLogger(ctx, logger) cookie, err := a.driver.Prepare(ctx, container) if err != nil { - call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err}) + call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err}) return } defer cookie.Close(ctx) // NOTE ensure this ctx doesn't time out waiter, err := cookie.Run(ctx) if err != nil { - call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err}) + call.slots.queueSlot(&hotSlot{done: make(chan struct{}), fatalErr: err}) return } @@ -670,18 +660,20 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state // wait for this call to finish // NOTE do NOT select with shutdown / other channels. slot handles this. <-slot.done + + if slot.fatalErr != nil { + logger.WithError(slot.fatalErr).Info("hot function terminating") + return + } } }() - res, err := waiter.Wait(ctx) - if err != nil { - errC <- err - } else if res.Error() != nil { - err = res.Error() - errC <- err + res := waiter.Wait(ctx) + if res.Error() != nil { + errC <- res.Error() // TODO: race condition, no guaranteed delivery fix this... } - logger.WithError(err).Info("hot function terminated") + logger.WithError(res.Error()).Info("hot function terminated") } // runHotReq enqueues a free slot to slot queue manager and watches various timers and the consumer until @@ -790,11 +782,45 @@ type container struct { stats *drivers.Stats } +func NewHotContainer(call *call, isBlockIdleIO bool) (*container, func()) { + + id := id.New().String() + + stdin := common.NewGhostReader() + stderr := common.NewGhostWriter() + stdout := common.NewGhostWriter() + + // when not processing a request, do we block IO? + if !isBlockIdleIO { + stderr.Swap(newLineWriter(&logWriter{ + logrus.WithFields(logrus.Fields{"tag": "stderr", "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": id}), + })) + stdout.Swap(newLineWriter(&logWriter{ + logrus.WithFields(logrus.Fields{"tag": "stdout", "app_name": call.AppName, "path": call.Path, "image": call.Image, "container_id": id}), + })) + } + + return &container{ + id: id, // XXX we could just let docker generate ids... + image: call.Image, + env: map[string]string(call.Config), + memory: call.Memory, + cpus: uint64(call.CPUs), + stdin: stdin, + stdout: stdout, + stderr: stderr, + }, func() { + stdin.Close() + stderr.Close() + stdout.Close() + } +} + func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() { // if tests don't catch this, then fuck me - ostdin := c.stdin.(*ghostReader).swap(stdin) - ostdout := c.stdout.(*ghostWriter).swap(stdout) - ostderr := c.stderr.(*ghostWriter).swap(stderr) + ostdin := c.stdin.(common.GhostReader).Swap(stdin) + ostdout := c.stdout.(common.GhostWriter).Swap(stdout) + ostderr := c.stderr.(common.GhostWriter).Swap(stderr) c.statsMu.Lock() ocs := c.stats @@ -802,9 +828,9 @@ func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers. c.statsMu.Unlock() return func() { - c.stdin.(*ghostReader).swap(ostdin) - c.stdout.(*ghostWriter).swap(ostdout) - c.stderr.(*ghostWriter).swap(ostderr) + c.stdin.(common.GhostReader).Swap(ostdin) + c.stdout.(common.GhostWriter).Swap(ostdout) + c.stderr.(common.GhostWriter).Swap(ostderr) c.statsMu.Lock() c.stats = ocs c.statsMu.Unlock() @@ -880,101 +906,3 @@ func init() { // Implementing the docker.AuthConfiguration interface. // TODO per call could implement this stored somewhere (vs. configured on host) //} - -// ghostWriter is an io.Writer who will pass writes to an inner writer -// that may be changed at will. it is thread safe to swap or write. -type ghostWriter struct { - sync.Mutex - inner io.Writer -} - -func (g *ghostWriter) swap(w io.Writer) (old io.Writer) { - g.Lock() - old = g.inner - g.inner = w - g.Unlock() - return old -} - -func (g *ghostWriter) Write(b []byte) (int, error) { - // we don't need to serialize writes but swapping g.inner could be a race if unprotected - g.Lock() - w := g.inner - g.Unlock() - n, err := w.Write(b) - if err == io.ErrClosedPipe { - // NOTE: we need to mask this error so that docker does not get an error - // from writing the output stream and shut down the container. - err = nil - } - return n, err -} - -// ghostReader is an io.ReadCloser who will pass reads to an inner reader -// that may be changed at will. it is thread safe to swap or read. -// Read will wait for a 'real' reader if inner is of type *waitReader. -// Close must be called to prevent any pending readers from leaking. -type ghostReader struct { - cond *sync.Cond - inner io.Reader - closed bool -} - -func (g *ghostReader) swap(r io.Reader) (old io.Reader) { - g.cond.L.Lock() - old = g.inner - g.inner = r - g.cond.L.Unlock() - g.cond.Broadcast() - return old -} - -func (g *ghostReader) Close() { - g.cond.L.Lock() - g.closed = true - g.cond.L.Unlock() - g.cond.Broadcast() -} - -func (g *ghostReader) awaitRealReader() (io.Reader, bool) { - // wait for a real reader - g.cond.L.Lock() - for { - if g.closed { // check this first - g.cond.L.Unlock() - return nil, false - } - if _, ok := g.inner.(*waitReader); ok { - g.cond.Wait() - } else { - break - } - } - - // we don't need to serialize reads but swapping g.inner could be a race if unprotected - r := g.inner - g.cond.L.Unlock() - return r, true -} - -func (g *ghostReader) Read(b []byte) (int, error) { - r, ok := g.awaitRealReader() - if !ok { - return 0, io.EOF - } - - n, err := r.Read(b) - if err == io.ErrClosedPipe { - // NOTE: we need to mask this error so that docker does not get an error - // from reading the input stream and shut down the container. - err = nil - } - return n, err -} - -// waitReader returns io.EOF if anyone calls Read. don't call Read, this is a sentinel type -type waitReader struct{} - -func (e *waitReader) Read([]byte) (int, error) { - panic("read on waitReader should not happen") -} diff --git a/api/agent/config.go b/api/agent/config.go index e1b3fbf83..2a4f2a049 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -15,6 +15,8 @@ type AgentConfig struct { MaxResponseSize uint64 `json:"max_response_size"` } +var MaxDisabledMsecs = time.Duration(math.MaxInt64) + func NewAgentConfig() (*AgentConfig, error) { var err error @@ -60,8 +62,8 @@ func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) { return defaultVal, err } // disable if negative or set to msecs specified. - if durInt < 0 || time.Duration(durInt) >= math.MaxInt64/time.Millisecond { - delay = math.MaxInt64 + if durInt < 0 || time.Duration(durInt) >= MaxDisabledMsecs/time.Millisecond { + delay = MaxDisabledMsecs } else { delay = time.Duration(durInt) * time.Millisecond } diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index 95b3b9ad1..d757f42e0 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -384,7 +384,7 @@ type waitResult struct { } // waitResult implements drivers.WaitResult -func (w *waitResult) Wait(ctx context.Context) (drivers.RunResult, error) { +func (w *waitResult) Wait(ctx context.Context) drivers.RunResult { defer close(w.done) // wait until container is stopped (or ctx is cancelled if sooner) @@ -392,7 +392,7 @@ func (w *waitResult) Wait(ctx context.Context) (drivers.RunResult, error) { return &runResult{ status: status, err: err, - }, nil + } } // Repeatedly collect stats from the specified docker container until the stopSignal is closed or the context is cancelled diff --git a/api/agent/drivers/docker/docker_test.go b/api/agent/drivers/docker/docker_test.go index 2f9bccfc1..1192d6998 100644 --- a/api/agent/drivers/docker/docker_test.go +++ b/api/agent/drivers/docker/docker_test.go @@ -56,9 +56,9 @@ func TestRunnerDocker(t *testing.T) { t.Fatal(err) } - result, err := waiter.Wait(ctx) - if err != nil { - t.Fatal(err) + result := waiter.Wait(ctx) + if result.Error() != nil { + t.Fatal(result.Error()) } if result.Status() != "success" { @@ -109,9 +109,9 @@ func TestRunnerDockerStdin(t *testing.T) { t.Fatal(err) } - result, err := waiter.Wait(ctx) - if err != nil { - t.Fatal(err) + result := waiter.Wait(ctx) + if result.Error() != nil { + t.Fatal(result.Error()) } if result.Status() != "success" { diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index 91a4fd2fe..0d7dba78a 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -47,7 +47,7 @@ type WaitResult interface { // provided context is canceled and the container does not return first, the // resulting status will be 'canceled'. If the provided context times out // then the resulting status will be 'timeout'. - Wait(context.Context) (RunResult, error) + Wait(context.Context) RunResult } type Driver interface { diff --git a/api/agent/drivers/mock/mocker.go b/api/agent/drivers/mock/mocker.go index d73e4e7e0..79415af7e 100644 --- a/api/agent/drivers/mock/mocker.go +++ b/api/agent/drivers/mock/mocker.go @@ -52,6 +52,6 @@ type runResult struct { start time.Time } -func (r *runResult) Wait(context.Context) (drivers.RunResult, error) { return r, nil } -func (r *runResult) Status() string { return r.status } -func (r *runResult) Error() error { return r.err } +func (r *runResult) Wait(context.Context) drivers.RunResult { return r } +func (r *runResult) Status() string { return r.status } +func (r *runResult) Error() error { return r.err } diff --git a/api/agent/protocol/factory.go b/api/agent/protocol/factory.go index ddb191240..9a35d9c36 100644 --- a/api/agent/protocol/factory.go +++ b/api/agent/protocol/factory.go @@ -13,6 +13,8 @@ import ( var errInvalidProtocol = errors.New("Invalid Protocol") +var ErrExcessData = errors.New("Excess data in stream") + type errorProto struct { error } diff --git a/api/agent/protocol/json.go b/api/agent/protocol/json.go index 27960ddcd..eb0b911b9 100644 --- a/api/agent/protocol/json.go +++ b/api/agent/protocol/json.go @@ -100,7 +100,8 @@ func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e _, span = trace.StartSpan(ctx, "dispatch_json_read_response") var jout jsonOut - err = json.NewDecoder(h.out).Decode(&jout) + decoder := json.NewDecoder(h.out) + err = decoder.Decode(&jout) span.End() if err != nil { return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid json response from function err: %v", err)) @@ -112,7 +113,8 @@ func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e rw, ok := w.(http.ResponseWriter) if !ok { // logs can just copy the full thing in there, headers and all. - return json.NewEncoder(w).Encode(jout) + err := json.NewEncoder(w).Encode(jout) + return h.isExcessData(err, decoder) } // this has to be done for pulling out: @@ -141,5 +143,16 @@ func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e } _, err = io.WriteString(rw, jout.Body) + return h.isExcessData(err, decoder) +} + +func (h *JSONProtocol) isExcessData(err error, decoder *json.Decoder) error { + if err == nil { + // Now check for excess output, if this is the case, we can be certain that the next request will fail. + tmp, ok := decoder.Buffered().(*bytes.Reader) + if ok && tmp.Len() > 0 { + return ErrExcessData + } + } return err } diff --git a/api/common/io_utils.go b/api/common/io_utils.go index 07e7d8224..5fa88ad0b 100644 --- a/api/common/io_utils.go +++ b/api/common/io_utils.go @@ -2,6 +2,7 @@ package common import ( "io" + "sync" ) type clampWriter struct { @@ -32,3 +33,161 @@ func (g *clampWriter) Write(p []byte) (int, error) { } return n, err } + +type GhostWriter interface { + io.Writer + io.Closer + Swap(r io.Writer) io.Writer +} + +// ghostWriter is an io.Writer who will pass writes to an inner writer +// that may be changed at will. it is thread safe to swap or write. +type ghostWriter struct { + cond *sync.Cond + inner io.Writer + closed bool +} + +func NewGhostWriter() GhostWriter { + return &ghostWriter{cond: sync.NewCond(new(sync.Mutex)), inner: new(waitWriter)} +} + +func (g *ghostWriter) Swap(w io.Writer) (old io.Writer) { + g.cond.L.Lock() + old = g.inner + g.inner = w + g.cond.L.Unlock() + g.cond.Broadcast() + return old +} + +func (g *ghostWriter) Close() error { + g.cond.L.Lock() + g.closed = true + g.cond.L.Unlock() + g.cond.Broadcast() + return nil +} + +func (g *ghostWriter) awaitRealWriter() (io.Writer, bool) { + // wait for a real writer + g.cond.L.Lock() + for { + if g.closed { // check this first + g.cond.L.Unlock() + return nil, false + } + if _, ok := g.inner.(*waitWriter); ok { + g.cond.Wait() + } else { + break + } + } + + // we don't need to serialize writes but swapping g.inner could be a race if unprotected + w := g.inner + g.cond.L.Unlock() + return w, true +} + +func (g *ghostWriter) Write(b []byte) (int, error) { + w, ok := g.awaitRealWriter() + if !ok { + return 0, io.EOF + } + + n, err := w.Write(b) + if err == io.ErrClosedPipe { + // NOTE: we need to mask this error so that docker does not get an error + // from writing the input stream and shut down the container. + err = nil + } + return n, err +} + +type GhostReader interface { + io.Reader + io.Closer + Swap(r io.Reader) io.Reader +} + +// ghostReader is an io.ReadCloser who will pass reads to an inner reader +// that may be changed at will. it is thread safe to swap or read. +// Read will wait for a 'real' reader if inner is of type *waitReader. +// Close must be called to prevent any pending readers from leaking. +type ghostReader struct { + cond *sync.Cond + inner io.Reader + closed bool +} + +func NewGhostReader() GhostReader { + return &ghostReader{cond: sync.NewCond(new(sync.Mutex)), inner: new(waitReader)} +} + +func (g *ghostReader) Swap(r io.Reader) (old io.Reader) { + g.cond.L.Lock() + old = g.inner + g.inner = r + g.cond.L.Unlock() + g.cond.Broadcast() + return old +} + +func (g *ghostReader) Close() error { + g.cond.L.Lock() + g.closed = true + g.cond.L.Unlock() + g.cond.Broadcast() + return nil +} + +func (g *ghostReader) awaitRealReader() (io.Reader, bool) { + // wait for a real reader + g.cond.L.Lock() + for { + if g.closed { // check this first + g.cond.L.Unlock() + return nil, false + } + if _, ok := g.inner.(*waitReader); ok { + g.cond.Wait() + } else { + break + } + } + + // we don't need to serialize reads but swapping g.inner could be a race if unprotected + r := g.inner + g.cond.L.Unlock() + return r, true +} + +func (g *ghostReader) Read(b []byte) (int, error) { + r, ok := g.awaitRealReader() + if !ok { + return 0, io.EOF + } + + n, err := r.Read(b) + if err == io.ErrClosedPipe { + // NOTE: we need to mask this error so that docker does not get an error + // from reading the input stream and shut down the container. + err = nil + } + return n, err +} + +// waitReader returns io.EOF if anyone calls Read. don't call Read, this is a sentinel type +type waitReader struct{} + +func (e *waitReader) Read([]byte) (int, error) { + panic("read on waitReader should not happen") +} + +// waitWriter returns io.EOF if anyone calls Write. don't call Write, this is a sentinel type +type waitWriter struct{} + +func (e *waitWriter) Write([]byte) (int, error) { + panic("write on waitWriter should not happen") +} diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 9981880cb..2c79cea24 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -3,6 +3,7 @@ package server import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io/ioutil" @@ -10,6 +11,7 @@ import ( "os" "strings" "testing" + "time" "github.com/fnproject/fn/api/agent" "github.com/fnproject/fn/api/datastore" @@ -18,6 +20,27 @@ import ( "github.com/fnproject/fn/api/mqs" ) +func envTweaker(name, value string) func() { + bck, ok := os.LookupEnv(name) + + err := os.Setenv(name, value) + if err != nil { + panic(err.Error()) + } + + return func() { + var err error + if !ok { + err = os.Unsetenv(name) + } else { + err = os.Setenv(name, bck) + } + if err != nil { + panic(err.Error()) + } + } +} + func testRunner(t *testing.T, args ...interface{}) (agent.Agent, context.CancelFunc) { ds := datastore.NewMock() var mq models.MessageQueue = &mqs.Mock{} @@ -123,9 +146,169 @@ func TestRouteRunnerPost(t *testing.T) { } } +func TestRouteRunnerIOPipes(t *testing.T) { + buf := setLogBuffer() + isFailure := false + + // let's make freezer immediate, so that we don't deal with + // more timing related issues below. Slightly gains us a bit more + // determinism. + tweaker := envTweaker("FN_FREEZE_IDLE_MSECS", "0") + defer tweaker() + + // Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.) + // for figuring out why things failed. + defer func() { + if isFailure { + t.Log(buf.String()) + } + }() + + rCfg := map[string]string{"ENABLE_HEADER": "yes", "ENABLE_FOOTER": "yes"} // enable container start/end header/footer + rImg := "fnproject/fn-test-utils" + + ds := datastore.NewMockInit( + []*models.App{ + {Name: "zoo", Config: models.Config{}}, + }, + []*models.Route{ + {Path: "/json", AppName: "zoo", Image: rImg, Type: "sync", Format: "json", Memory: 64, Timeout: 30, IdleTimeout: 30, Config: rCfg}, + {Path: "/http", AppName: "zoo", Image: rImg, Type: "sync", Format: "http", Memory: 64, Timeout: 30, IdleTimeout: 30, Config: rCfg}, + }, nil, + ) + + rnr, cancelrnr := testRunner(t, ds) + defer cancelrnr() + + srv := testServer(ds, &mqs.Mock{}, ds, rnr, ServerTypeFull) + + // sleep between logs and with debug enabled, fn-test-utils will log header/footer below: + immediateGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 0}` + delayedGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 1000}` + ok := `{"isDebug": true}` + + //multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"} + + containerIds := make([]string, 0) + + for i, test := range []struct { + path string + body string + method string + expectedCode int + expectedErrSubStr string + expectedLogsSubStr []string + sleepAmount time.Duration + }{ + // + // JSON WORLD + // + // CASE I: immediate garbage: likely to be in the json decoder buffer after json resp parsing + {"/r/zoo/json/", immediateGarbage, "GET", http.StatusOK, "", nil, 0}, + + // CASE II: delayed garbage: make sure delayed output lands in between request processing, should be blocked until next req + {"/r/zoo/json/", delayedGarbage, "GET", http.StatusOK, "", nil, time.Second * 2}, + + // CASE III: normal, but should get faulty I/O from previous + {"/r/zoo/json/", ok, "GET", http.StatusBadGateway, "invalid json", nil, 0}, + + // CASE IV: should land on CASE III container + {"/r/zoo/json/", ok, "GET", http.StatusOK, "", nil, 0}, + + // + // HTTP WORLD + // + // CASE I: immediate garbage: should be ignored (TODO: this should test immediateGarbage case, FIX THIS) + {"/r/zoo/http", ok, "GET", http.StatusOK, "", nil, 0}, + + // CASE II: delayed garbage: make sure delayed output lands in between request processing, freezer should block, + // bad IO lands on next request. + {"/r/zoo/http", delayedGarbage, "GET", http.StatusOK, "", nil, time.Second * 2}, + + // CASE III: normal, but should not land on any container from case I/II. + {"/r/zoo/http/", ok, "GET", http.StatusBadGateway, "invalid http", nil, 0}, + + // CASE IV: should land on CASE III container + {"/r/zoo/http/", ok, "GET", http.StatusOK, "", nil, 0}, + } { + body := strings.NewReader(test.body) + _, rec := routerRequest(t, srv.Router, test.method, test.path, body) + respBytes, _ := ioutil.ReadAll(rec.Body) + respBody := string(respBytes) + maxBody := len(respBody) + if maxBody > 1024 { + maxBody = 1024 + } + + containerIds = append(containerIds, "N/A") + + if rec.Code != test.expectedCode { + isFailure = true + t.Errorf("Test %d: Expected status code to be %d but was %d. body: %s", + i, test.expectedCode, rec.Code, respBody[:maxBody]) + } + + if test.expectedErrSubStr != "" && !strings.Contains(respBody, test.expectedErrSubStr) { + isFailure = true + t.Errorf("Test %d: Expected response to include %s but got body: %s", + i, test.expectedErrSubStr, respBody[:maxBody]) + + } + + if test.expectedLogsSubStr != nil { + callID := rec.Header().Get("Fn_call_id") + if !checkLogs(t, i, ds, callID, test.expectedLogsSubStr) { + isFailure = true + } + } + + if rec.Code == http.StatusOK { + dockerId, err := getDockerId(respBytes) + if err != nil { + isFailure = true + t.Errorf("Test %d: cannot fetch docker id body: %s", + i, respBody[:maxBody]) + } + containerIds[i] = dockerId + } + + t.Logf("Test %d: dockerId: %v", i, containerIds[i]) + time.Sleep(test.sleepAmount) + } + + jsonIds := containerIds[0:4] + + // now cross check JSON container ids: + if jsonIds[0] != jsonIds[1] && + jsonIds[2] == "N/A" && + jsonIds[1] != jsonIds[2] && + jsonIds[2] != jsonIds[3] { + t.Logf("json container ids are OK, ids=%v", jsonIds) + } else { + isFailure = true + t.Errorf("json container ids are not OK, ids=%v", jsonIds) + } + + httpids := containerIds[4:] + + // now cross check HTTP container ids: + if httpids[0] == httpids[1] && + httpids[2] == "N/A" && + httpids[1] != httpids[2] && + httpids[2] != httpids[3] { + t.Logf("http container ids are OK, ids=%v", httpids) + } else { + isFailure = true + t.Errorf("http container ids are not OK, ids=%v", httpids) + } +} + func TestRouteRunnerExecution(t *testing.T) { buf := setLogBuffer() isFailure := false + tweaker := envTweaker("FN_MAX_RESPONSE_SIZE", "2048") + defer tweaker() + // Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.) // for figuring out why things failed. defer func() { @@ -140,12 +323,6 @@ func TestRouteRunnerExecution(t *testing.T) { rImgBs1 := "fnproject/imagethatdoesnotexist" rImgBs2 := "localhost:5000/fnproject/imagethatdoesnotexist" - err := os.Setenv("FN_MAX_RESPONSE_SIZE", "2048") - if err != nil { - t.Errorf("Cannot set response size %v", err) - } - defer os.Setenv("FN_MAX_RESPONSE_SIZE", "") - ds := datastore.NewMockInit( []*models.App{ {Name: "myapp", Config: models.Config{}}, @@ -182,10 +359,10 @@ func TestRouteRunnerExecution(t *testing.T) { respTypeJason := `{"jasonContentType": "foo/bar", "isDebug": true}` // Content-Type: foo/bar // sleep between logs and with debug enabled, fn-test-utils will log header/footer below: - multiLog := `{"sleepTime": 1, "isDebug": true}` + multiLog := `{"sleepTime": 1000, "isDebug": true}` multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"} - bigoutput := `{"sleepTime": 0, "isDebug": true, "echoContent": "repeatme", "trailerRepeat": 1000}` // 1000 trailers to exceed 2K - smalloutput := `{"sleepTime": 0, "isDebug": true, "echoContent": "repeatme", "trailerRepeat": 1}` // 1 trailer < 2K + bigoutput := `{"isDebug": true, "echoContent": "repeatme", "trailerRepeat": 1000}` // 1000 trailers to exceed 2K + smalloutput := `{"isDebug": true, "echoContent": "repeatme", "trailerRepeat": 1}` // 1 trailer < 2K for i, test := range []struct { path string @@ -199,17 +376,17 @@ func TestRouteRunnerExecution(t *testing.T) { {"/r/myapp/", ok, "GET", http.StatusOK, expHeaders, "", nil}, {"/r/myapp/myhot", badHot, "GET", http.StatusBadGateway, expHeaders, "invalid http response", nil}, - // hot container now back to normal, we should get OK + // hot container now back to normal: {"/r/myapp/myhot", ok, "GET", http.StatusOK, expHeaders, "", nil}, + {"/r/myapp/myhotjason", badHot, "GET", http.StatusBadGateway, expHeaders, "invalid json response", nil}, + // hot container now back to normal: {"/r/myapp/myhotjason", ok, "GET", http.StatusOK, expHeaders, "", nil}, {"/r/myapp/myhot", respTypeLie, "GET", http.StatusOK, expCTHeaders, "", nil}, {"/r/myapp/myhotjason", respTypeLie, "GET", http.StatusOK, expCTHeaders, "", nil}, {"/r/myapp/myhotjason", respTypeJason, "GET", http.StatusOK, expCTHeaders, "", nil}, - {"/r/myapp/myhotjason", badHot, "GET", http.StatusBadGateway, expHeaders, "invalid json response", nil}, - {"/r/myapp/myroute", ok, "GET", http.StatusOK, expHeaders, "", nil}, {"/r/myapp/myerror", crasher, "GET", http.StatusBadGateway, expHeaders, "container exit code 2", nil}, {"/r/myapp/mydne", ``, "GET", http.StatusNotFound, nil, "pull access denied", nil}, @@ -261,37 +438,69 @@ func TestRouteRunnerExecution(t *testing.T) { if test.expectedLogsSubStr != nil { callID := rec.Header().Get("Fn_call_id") - - logReader, err := ds.GetLog(context.Background(), "myapp", callID) - if err != nil { + if !checkLogs(t, i, ds, callID, test.expectedLogsSubStr) { isFailure = true - t.Errorf("Test %d: GetLog for call_id:%s returned err %s", - i, callID, err.Error()) - } else { - logBytes, err := ioutil.ReadAll(logReader) - if err != nil { - isFailure = true - t.Errorf("Test %d: GetLog read IO call_id:%s returned err %s", - i, callID, err.Error()) - } else { - logBody := string(logBytes) - maxLog := len(logBody) - if maxLog > 1024 { - maxLog = 1024 - } - for _, match := range test.expectedLogsSubStr { - if !strings.Contains(logBody, match) { - isFailure = true - t.Errorf("Test %d: GetLog read IO call_id:%s cannot find: %s in logs: %s", - i, callID, match, logBody[:maxLog]) - } - } - } } } } } +func getDockerId(respBytes []byte) (string, error) { + + var respJs map[string]interface{} + var data map[string]interface{} + + err := json.Unmarshal(respBytes, &respJs) + if err != nil { + return "", err + } + + data, ok := respJs["data"].(map[string]interface{}) + if !ok { + return "", errors.New("unexpected json: data map") + } + + id, ok := data["DockerId"].(string) + if !ok { + return "", errors.New("unexpected json: docker id string") + } + + return id, nil +} + +func checkLogs(t *testing.T, tnum int, ds models.Datastore, callID string, expected []string) bool { + + logReader, err := ds.GetLog(context.Background(), "myapp", callID) + if err != nil { + t.Errorf("Test %d: GetLog for call_id:%s returned err %s", + tnum, callID, err.Error()) + return false + } + + logBytes, err := ioutil.ReadAll(logReader) + if err != nil { + t.Errorf("Test %d: GetLog read IO call_id:%s returned err %s", + tnum, callID, err.Error()) + return false + } + + logBody := string(logBytes) + maxLog := len(logBody) + if maxLog > 1024 { + maxLog = 1024 + } + + for _, match := range expected { + if !strings.Contains(logBody, match) { + t.Errorf("Test %d: GetLog read IO call_id:%s cannot find: %s in logs: %s", + tnum, callID, match, logBody[:maxLog]) + return false + } + } + + return true +} + // implement models.MQ and models.APIError type errorMQ struct { error diff --git a/images/fn-test-utils/fn-test-utils.go b/images/fn-test-utils/fn-test-utils.go index 1c7aefb7a..033de238b 100644 --- a/images/fn-test-utils/fn-test-utils.go +++ b/images/fn-test-utils/fn-test-utils.go @@ -54,6 +54,12 @@ type AppRequest struct { TrailerRepeat int `json:"trailerRepeat,omitempty"` // corrupt http or json InvalidResponse bool `json:"invalidResponse,omitempty"` + // if specified we 'sleep' the specified msecs *after* processing request + PostSleepTime int `json:"postSleepTime,omitempty"` + // spit this out in stdout after processing each request + PostOutGarbage string `json:"postOutGarbage,omitempty"` + // spit this out in stderr after processing each request + PostErrGarbage string `json:"postErrGarbage,omitempty"` // TODO: simulate slow read/slow write // TODO: simulate partial IO write/read // TODO: simulate high cpu usage (async and sync) @@ -101,6 +107,10 @@ func AppHandler(ctx context.Context, in io.Reader, out io.Writer) { var outto fdkresponse outto.Writer = out finalizeRequest(&outto, req, resp) + err := postProcessRequest(req, out) + if err != nil { + panic(err.Error()) + } } func finalizeRequest(out *fdkresponse, req *AppRequest, resp *AppResponse) { @@ -222,6 +232,37 @@ func processRequest(ctx context.Context, in io.Reader) (*AppRequest, *AppRespons return &request, &resp } +func postProcessRequest(request *AppRequest, out io.Writer) error { + if request == nil { + return nil + } + + if request.PostSleepTime > 0 { + if request.IsDebug { + log.Printf("PostProcess Sleeping %d", request.PostSleepTime) + } + time.Sleep(time.Duration(request.PostSleepTime) * time.Millisecond) + } + + if request.PostOutGarbage != "" { + if request.IsDebug { + log.Printf("PostProcess PostOutGarbage %s", request.PostOutGarbage) + } + + _, err := io.WriteString(out, request.PostOutGarbage) + if err != nil { + log.Printf("PostOutGarbage write string error %v", err) + return err + } + } + + if request.PostErrGarbage != "" { + log.Printf("PostProcess PostErrGarbage %s", request.PostErrGarbage) + } + + return nil +} + func main() { if os.Getenv("ENABLE_HEADER") != "" { log.Printf("Container starting") @@ -287,6 +328,7 @@ func testDoJSONOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes resp.Header = hdr var jsonRequest fdkutils.JsonIn + var appRequest *AppRequest err := json.NewDecoder(in).Decode(&jsonRequest) if err != nil { // stdin now closed @@ -295,7 +337,11 @@ func testDoJSONOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes return err } resp.Status = http.StatusInternalServerError - io.WriteString(resp, fmt.Sprintf(`{"error": %v}`, err.Error())) + _, err = io.WriteString(resp, fmt.Sprintf(`{"error": %v}`, err.Error())) + if err != nil { + log.Printf("json write string error %v", err) + return err + } } else { fdkutils.SetHeaders(ctx, jsonRequest.Protocol.Headers) ctx, cancel := fdkutils.CtxWithDeadline(ctx, jsonRequest.Deadline) @@ -308,6 +354,7 @@ func testDoJSONOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes io.Copy(out, strings.NewReader(InvalidResponseStr)) } + appRequest = appReq } jsonResponse := getJSONResp(buf, &resp, &jsonRequest) @@ -324,7 +371,7 @@ func testDoJSONOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes return err } - return nil + return postProcessRequest(appRequest, out) } // since we need to test little jason's content type since he's special. but we @@ -355,6 +402,7 @@ func testDoHTTPOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes resp.Status = 200 resp.Header = hdr + var appRequest *AppRequest req, err := http.ReadRequest(bufio.NewReader(in)) if err != nil { // stdin now closed @@ -364,7 +412,11 @@ func testDoHTTPOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes } // TODO it would be nice if we could let the user format this response to their preferred style.. resp.Status = http.StatusInternalServerError - io.WriteString(resp, err.Error()) + _, err = io.WriteString(resp, err.Error()) + if err != nil { + log.Printf("http write string error %v", err) + return err + } } else { fnDeadline := fdkutils.Context(ctx).Header.Get("FN_DEADLINE") ctx, cancel := fdkutils.CtxWithDeadline(ctx, fnDeadline) @@ -378,6 +430,7 @@ func testDoHTTPOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes io.Copy(out, strings.NewReader(InvalidResponseStr)) } + appRequest = appReq } hResp := fdkutils.GetHTTPResp(buf, &resp.Response, req) @@ -388,7 +441,7 @@ func testDoHTTPOnce(ctx context.Context, in io.Reader, out io.Writer, buf *bytes return err } - return nil + return postProcessRequest(appRequest, out) } func getChunk(size int) []byte {