From aa13a401682ce83ba1f57a1f11517d4ac26f15b4 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 14 Sep 2018 16:50:14 -0700 Subject: [PATCH] fn: agent/lb/runner error handling adjustments (#1214) 1) Early call validation and return due to cpu/mem impossible to meet (eg. request cpu/mem larger than max-mem or max-cpu on server) now emits HTTP Bad Request (400) instead of 503. This case is most likely due to client/service configuration and/or validation issue. 2) 'failed' metric is now removed. 'failed' versus 'errors' were too confusing. 'errors' is now a catch all error case. 3) new 'canceled' counter for client side cancels. 4) 'server_busy' now covers more cases than it previously did. --- api/agent/agent.go | 69 +++++++------------ api/agent/agent_test.go | 2 +- api/agent/call.go | 3 +- api/agent/lb_agent.go | 65 +++++++++-------- api/agent/stats.go | 67 +++++++++++------- api/models/error.go | 4 ++ api/server/runner_fninvoke_test.go | 4 +- api/server/runner_httptrigger_test.go | 4 +- api/server/runner_test.go | 4 +- .../fn-system-tests/exec_http_trigger_test.go | 32 --------- test/fn-system-tests/exec_route_test.go | 33 --------- 11 files changed, 115 insertions(+), 172 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index be0a95c28..c23153f64 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -244,15 +244,17 @@ func (a *agent) Close() error { } func (a *agent) Submit(callI Call) error { + call := callI.(*call) + ctx, span := trace.StartSpan(call.req.Context(), "agent_submit") + defer span.End() + + statsCalls(ctx) + if !a.shutWg.AddSession(1) { + statsTooBusy(ctx) return models.ErrCallTimeoutServerBusy } - - call := callI.(*call) - - ctx := call.req.Context() - ctx, span := trace.StartSpan(ctx, "agent_submit") - defer span.End() + defer a.shutWg.DoneSession() err := a.submit(ctx, call) return err @@ -294,7 +296,8 @@ func (a *agent) submit(ctx context.Context, call *call) error { return a.handleCallEnd(ctx, call, slot, err, false) } - statsDequeueAndStart(ctx) + statsDequeue(ctx) + statsStartRun(ctx) // We are about to execute the function, set container Exec Deadline (call.Timeout) slotCtx, cancel := context.WithTimeout(ctx, time.Duration(call.Timeout)*time.Second) @@ -314,52 +317,26 @@ func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err er // This means call was routed (executed) if isStarted { call.End(ctx, err) - } - - handleStatsEnd(ctx, err) - a.shutWg.DoneSession() - return transformTimeout(err, !isStarted) -} - -func transformTimeout(e error, isRetriable bool) error { - if e == context.DeadlineExceeded { - if isRetriable { + statsStopRun(ctx) + if err == nil { + statsComplete(ctx) + } + } else { + if err == CapacityFull || err == context.DeadlineExceeded { + statsTooBusy(ctx) return models.ErrCallTimeoutServerBusy } - return models.ErrCallTimeout - } else if e == CapacityFull { - return models.ErrCallTimeoutServerBusy } - return e -} -// handleStatsDequeue handles stats for dequeuing for early exit (getSlot or Start) -// cases. Only timeouts can be a simple dequeue while other cases are actual errors. -func handleStatsDequeue(ctx context.Context, err error) { if err == context.DeadlineExceeded { - statsDequeue(ctx) - statsTooBusy(ctx) - } else { - statsDequeueAndFail(ctx) + statsTimedout(ctx) + return models.ErrCallTimeout + } else if err == context.Canceled { + statsCanceled(ctx) + } else if err != nil { statsErrors(ctx) } -} - -// handleStatsEnd handles stats for after a call is ran, depending on error. -func handleStatsEnd(ctx context.Context, err error) { - if err == nil { - // decrement running count, increment completed count - statsComplete(ctx) - } else { - // decrement running count, increment failed count - statsFailed(ctx) - // increment the timeout or errors count, as appropriate - if err == context.DeadlineExceeded { - statsTimedout(ctx) - } else { - statsErrors(ctx) - } - } + return err } // getSlot returns a Slot (or error) for the request to run. Depending on hot/cold diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index f97475f0d..157618d25 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -581,7 +581,7 @@ func TestGetCallReturnsResourceImpossibility(t *testing.T) { defer checkClose(t, a) _, err := a.GetCall(FromModel(call)) - if err != models.ErrCallTimeoutServerBusy { + if err != models.ErrCallResourceTooBig { t.Fatal("did not get expected err, got: ", err) } } diff --git a/api/agent/call.go b/api/agent/call.go index f58e1cc38..bfd17264b 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -428,8 +428,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { mem := c.Memory + uint64(c.TmpFsSize) if !a.resources.IsResourcePossible(mem, uint64(c.CPUs), c.Type == models.TypeAsync) { - // if we're not going to be able to run this call on this machine, bail here. - return nil, models.ErrCallTimeoutServerBusy + return nil, models.ErrCallResourceTooBig } setupCtx(&c) diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index bfbe64361..1aa489c15 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -146,22 +146,19 @@ func (a *lbAgent) Close() error { // implements Agent func (a *lbAgent) Submit(callI Call) error { - if !a.shutWg.AddSession(1) { - return models.ErrCallTimeoutServerBusy - } - call := callI.(*call) ctx, span := trace.StartSpan(call.req.Context(), "agent_submit") defer span.End() - statsEnqueue(ctx) + statsCalls(ctx) - err := call.Start(ctx) - if err != nil { - return a.handleCallEnd(ctx, call, err, false) + if !a.shutWg.AddSession(1) { + statsTooBusy(ctx) + return models.ErrCallTimeoutServerBusy } + defer a.shutWg.DoneSession() - statsDequeueAndStart(ctx) + statsEnqueue(ctx) // pre-read and buffer request body if already not done based // on GetBody presence. @@ -169,21 +166,19 @@ func (a *lbAgent) Submit(callI Call) error { if buf != nil { defer bufPool.Put(buf) } - if err != nil { - common.Logger(call.req.Context()).WithError(err).Error("Failed to process call body") - return a.handleCallEnd(ctx, call, err, true) + return a.handleCallEnd(ctx, call, err, false) } - // WARNING: isStarted (handleCallEnd) semantics - // need some consideration here. Similar to runner/agent - // we consider isCommitted true if call.Start() succeeds. - // isStarted=true means we will call Call.End(). + err = call.Start(ctx) + if err != nil { + return a.handleCallEnd(ctx, call, err, false) + } + + statsDequeue(ctx) + statsStartRun(ctx) + err = a.placer.PlaceCall(a.rp, ctx, call) - if err != nil { - common.Logger(call.req.Context()).WithError(err).Error("Failed to place call") - } - return a.handleCallEnd(ctx, call, err, true) } @@ -235,17 +230,33 @@ func (a *lbAgent) Enqueue(context.Context, *models.Call) error { return errors.New("Enqueue not implemented") } -func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isStarted bool) error { - - if isStarted { +func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isForwarded bool) error { + if isForwarded { call.End(ctx, err) - handleStatsEnd(ctx, err) + statsStopRun(ctx) + if err == nil { + statsComplete(ctx) + } } else { - handleStatsDequeue(ctx, err) + statsDequeue(ctx) + if err == context.DeadlineExceeded { + statsTooBusy(ctx) + return models.ErrCallTimeoutServerBusy + } } - a.shutWg.DoneSession() - return transformTimeout(err, !isStarted) + if err == models.ErrCallTimeoutServerBusy { + statsTooBusy(ctx) + return models.ErrCallTimeoutServerBusy + } else if err == context.DeadlineExceeded { + statsTimedout(ctx) + return models.ErrCallTimeout + } else if err == context.Canceled { + statsCanceled(ctx) + } else if err != nil { + statsErrors(ctx) + } + return err } var _ Agent = &lbAgent{} diff --git a/api/agent/stats.go b/api/agent/stats.go index 77f99deee..abee53101 100644 --- a/api/agent/stats.go +++ b/api/agent/stats.go @@ -12,38 +12,32 @@ import ( "go.opencensus.io/stats/view" ) -// TODO add some suga: -// * hot containers active -// * memory used / available - -func statsEnqueue(ctx context.Context) { - stats.Record(ctx, queuedMeasure.M(1)) +func statsCalls(ctx context.Context) { stats.Record(ctx, callsMeasure.M(1)) } -// Call when a function has been queued but cannot be started because of an error +func statsEnqueue(ctx context.Context) { + stats.Record(ctx, queuedMeasure.M(1)) +} + func statsDequeue(ctx context.Context) { stats.Record(ctx, queuedMeasure.M(-1)) } -func statsDequeueAndStart(ctx context.Context) { - stats.Record(ctx, queuedMeasure.M(-1)) +func statsStartRun(ctx context.Context) { stats.Record(ctx, runningMeasure.M(1)) } -func statsComplete(ctx context.Context) { +func statsStopRun(ctx context.Context) { stats.Record(ctx, runningMeasure.M(-1)) +} + +func statsComplete(ctx context.Context) { stats.Record(ctx, completedMeasure.M(1)) } -func statsFailed(ctx context.Context) { - stats.Record(ctx, runningMeasure.M(-1)) - stats.Record(ctx, failedMeasure.M(1)) -} - -func statsDequeueAndFail(ctx context.Context) { - stats.Record(ctx, queuedMeasure.M(-1)) - stats.Record(ctx, failedMeasure.M(1)) +func statsCanceled(ctx context.Context) { + stats.Record(ctx, canceledMeasure.M(1)) } func statsTimedout(ctx context.Context) { @@ -71,12 +65,36 @@ func statsContainerEvicted(ctx context.Context) { } const ( - // TODO we should probably prefix these with calls_ ? + // + // WARNING: Dual Role Metrics both used in Runner/Agent and LB-Agent + // + // LB Context: + // + // calls - call received in Agent Submit + // queued - LB is reading request from Client and attempting to validate/start + // running - LB is forwarding Call to runners + // completed - call completed running successfully + // canceled - call canceled (client disconnect) + // timeouts - call timed out + // errors - call failed + // server_busy - server busy responses (retriable) + // + // Agent/Runner Context: + // + // calls - calls received in Agent Submit + // queued - Reading/validating call from client and waiting for resources/containers to start + // running - call is now running + // completed - call completed running (success) + // canceled - call canceled (client disconnect) + // timeouts - call timed out + // errors - call failed + // server_busy - server busy responses (retriable) + // queuedMetricName = "queued" - callsMetricName = "calls" // TODO this is a dupe of sum {complete,failed} ? + callsMetricName = "calls" runningMetricName = "running" completedMetricName = "completed" - failedMetricName = "failed" + canceledMetricName = "canceled" timedoutMetricName = "timeouts" errorsMetricName = "errors" serverBusyMetricName = "server_busy" @@ -89,12 +107,11 @@ const ( ) var ( - queuedMeasure = common.MakeMeasure(queuedMetricName, "calls currently queued against agent", "") - // TODO this is a dupe of sum {complete,failed} ? + queuedMeasure = common.MakeMeasure(queuedMetricName, "calls currently queued against agent", "") callsMeasure = common.MakeMeasure(callsMetricName, "calls created in agent", "") runningMeasure = common.MakeMeasure(runningMetricName, "calls currently running in agent", "") completedMeasure = common.MakeMeasure(completedMetricName, "calls completed in agent", "") - failedMeasure = common.MakeMeasure(failedMetricName, "calls failed in agent", "") + canceledMeasure = common.MakeMeasure(canceledMetricName, "calls canceled in agent", "") timedoutMeasure = common.MakeMeasure(timedoutMetricName, "calls timed out in agent", "") errorsMeasure = common.MakeMeasure(errorsMetricName, "calls errored in agent", "") serverBusyMeasure = common.MakeMeasure(serverBusyMetricName, "calls where server was too busy in agent", "") @@ -127,7 +144,7 @@ func RegisterAgentViews(tagKeys []string, latencyDist []float64) { common.CreateView(callsMeasure, view.Sum(), tagKeys), common.CreateView(runningMeasure, view.Sum(), tagKeys), common.CreateView(completedMeasure, view.Sum(), tagKeys), - common.CreateView(failedMeasure, view.Sum(), tagKeys), + common.CreateView(canceledMeasure, view.Sum(), tagKeys), common.CreateView(timedoutMeasure, view.Sum(), tagKeys), common.CreateView(errorsMeasure, view.Sum(), tagKeys), common.CreateView(serverBusyMeasure, view.Sum(), tagKeys), diff --git a/api/models/error.go b/api/models/error.go index d64833e2e..b4c91a355 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -161,6 +161,10 @@ var ( code: http.StatusBadRequest, error: fmt.Errorf("memory value is out of range. It should be between 0 and %d", RouteMaxMemory), } + ErrCallResourceTooBig = err{ + code: http.StatusBadRequest, + error: fmt.Errorf("Requested CPU/Memory cannot be allocated"), + } ErrCallNotFound = err{ code: http.StatusNotFound, error: errors.New("Call not found"), diff --git a/api/server/runner_fninvoke_test.go b/api/server/runner_fninvoke_test.go index f827d667d..9232d953d 100644 --- a/api/server/runner_fninvoke_test.go +++ b/api/server/runner_fninvoke_test.go @@ -328,8 +328,8 @@ func TestInvokeRunnerTimeout(t *testing.T) { {"/invoke/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, {"/invoke/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil}, {"/invoke/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, - {"/invoke/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}}, - {"/invoke/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}}, + {"/invoke/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusBadRequest, nil}, + {"/invoke/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusBadRequest, nil}, } { t.Run(fmt.Sprintf("%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) { trx := fmt.Sprintf("_trx_%d_", i) diff --git a/api/server/runner_httptrigger_test.go b/api/server/runner_httptrigger_test.go index 74cfdfaac..2f0ffe1bc 100644 --- a/api/server/runner_httptrigger_test.go +++ b/api/server/runner_httptrigger_test.go @@ -516,8 +516,8 @@ func TestTriggerRunnerTimeout(t *testing.T) { {"/t/myapp/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, {"/t/myapp/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil}, {"/t/myapp/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, - {"/t/myapp/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}}, - {"/t/myapp/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}}, + {"/t/myapp/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusBadRequest, nil}, + {"/t/myapp/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusBadRequest, nil}, } { t.Run(fmt.Sprintf("%d_%s", i, strings.Replace(test.path, "/", "_", -1)), func(t *testing.T) { trx := fmt.Sprintf("_trx_%d_", i) diff --git a/api/server/runner_test.go b/api/server/runner_test.go index fc23e6ff4..dda1c4755 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -386,8 +386,8 @@ func TestRouteRunnerTimeout(t *testing.T) { {"/r/myapp/hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, {"/r/myapp/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 5000, "isDebug": true}`, "POST", http.StatusGatewayTimeout, nil}, {"/r/myapp/hot-json", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusOK, nil}, - {"/r/myapp/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}}, - {"/r/myapp/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusServiceUnavailable, map[string][]string{"Retry-After": {"15"}}}, + {"/r/myapp/bigmem-cold", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusBadRequest, nil}, + {"/r/myapp/bigmem-hot", `{"echoContent": "_TRX_ID_", "sleepTime": 0, "isDebug": true}`, "POST", http.StatusBadRequest, nil}, } { trx := fmt.Sprintf("_trx_%d_", i) body := strings.NewReader(strings.Replace(test.body, "_TRX_ID_", trx, 1)) diff --git a/test/fn-system-tests/exec_http_trigger_test.go b/test/fn-system-tests/exec_http_trigger_test.go index 47d54abf1..1faad067b 100644 --- a/test/fn-system-tests/exec_http_trigger_test.go +++ b/test/fn-system-tests/exec_http_trigger_test.go @@ -299,38 +299,6 @@ func TestBasicTriggerConcurrentExecution(t *testing.T) { } -func TestTriggerSaturatedSystem(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) - defer cancel() - - app := ensureApp(t, rp.ValidApp()) - validFn := rp.ValidFn(app.ID) - validFn.ResourceConfig.Timeout = 1 - validFn.ResourceConfig.Memory = 300 - - fn := ensureFn(t, validFn) - trigger := ensureTrigger(t, rp.ValidTrigger(app.ID, fn.ID)) - - lb, err := LB() - if err != nil { - t.Fatalf("Got unexpected error: %v", err) - } - u := url.URL{ - Scheme: "http", - Host: lb, - } - u.Path = path.Join(u.Path, "t", app.Name, trigger.Source) - - body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true}` - content := bytes.NewBuffer([]byte(body)) - output := &bytes.Buffer{} - - resp, err := callTrigger(ctx, u.String(), content, output, "POST") - if resp != nil || err == nil || ctx.Err() == nil { - t.Fatalf("Expected response: %v err:%v", resp, err) - } -} - func callTrigger(ctx context.Context, u string, content io.Reader, output io.Writer, method string) (*http.Response, error) { if method == "" { if content == nil { diff --git a/test/fn-system-tests/exec_route_test.go b/test/fn-system-tests/exec_route_test.go index 8331983b3..cf89450da 100644 --- a/test/fn-system-tests/exec_route_test.go +++ b/test/fn-system-tests/exec_route_test.go @@ -225,39 +225,6 @@ func TestBasicConcurrentExecution(t *testing.T) { } -func TestSaturatedSystem(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) - defer cancel() - rt := &models.Route{ - Path: routeName, - Timeout: 1, - Image: "fnproject/fn-test-utils", - Format: "json", - Memory: 300, - Type: "sync", - } - rt = ensureRoute(t, rt) - - lb, err := LB() - if err != nil { - t.Fatalf("Got unexpected error: %v", err) - } - u := url.URL{ - Scheme: "http", - Host: lb, - } - u.Path = path.Join(u.Path, "r", appName, rt.Path) - - body := `{"echoContent": "HelloWorld", "sleepTime": 0, "isDebug": true}` - content := bytes.NewBuffer([]byte(body)) - output := &bytes.Buffer{} - - resp, err := callFN(ctx, u.String(), content, output, "POST") - if resp != nil || err == nil || ctx.Err() == nil { - t.Fatalf("Expected response: %v err:%v", resp, err) - } -} - func callFN(ctx context.Context, u string, content io.Reader, output io.Writer, method string) (*http.Response, error) { if method == "" { if content == nil {