From ee262901a227741b2bb6822120d8ee03916783a6 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Tue, 10 Apr 2018 10:48:12 -0700 Subject: [PATCH] fn: handleCallEnd and submit improvements (#919) * fn: move call error/end handling to handleCallEnd This simplifies submit() function but moves the burden of retriable-versus-committed request handling and slot.Close() responsibility to handleCallEnd(). --- api/agent/agent.go | 61 ++++++++++++++++++++++++---------- api/agent/agent_test.go | 37 +++++++++++++++++---- api/agent/lb_agent.go | 57 +++++++++++-------------------- test/fn-api-tests/exec_test.go | 30 +++++++++++++---- 4 files changed, 116 insertions(+), 69 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 1ff4434c4..ca3bfad91 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -238,39 +238,70 @@ func (a *agent) submit(ctx context.Context, call *call) error { slot, err := a.getSlot(ctx, call) if err != nil { - handleStatsDequeue(ctx, err) - return transformTimeout(err, true) + return a.handleCallEnd(ctx, call, slot, err, false) } - defer slot.Close(ctx) // notify our slot is free once we're done err = call.Start(ctx) if err != nil { - handleStatsDequeue(ctx, err) - return transformTimeout(err, true) + return a.handleCallEnd(ctx, call, slot, err, false) } statsDequeueAndStart(ctx) // pass this error (nil or otherwise) to end directly, to store status, etc err = slot.exec(ctx, call) - handleStatsEnd(ctx, err) - a.handleCallEnd(ctx, call, err) - return transformTimeout(err, false) + return a.handleCallEnd(ctx, call, slot, err, true) } -func (a *agent) handleCallEnd(ctx context.Context, call *call, err error) { +func (a *agent) scheduleCallEnd(fn func()) { a.wg.Add(1) atomic.AddInt64(&a.callEndCount, 1) go func() { - ctx = common.BackgroundContext(ctx) - ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout) - call.End(ctx, err) - cancel() + fn() atomic.AddInt64(&a.callEndCount, -1) a.wg.Done() }() } +func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isCommitted bool) error { + + // For hot-containers, slot close is a simple channel close... No need + // to handle it async. Execute it here ASAP + if slot != nil && protocol.IsStreamable(protocol.Protocol(call.Format)) { + slot.Close(ctx) + slot = nil + } + + // This means call was routed (executed), in order to reduce latency here + // we perform most of these tasks in go-routine asynchronously. + if isCommitted { + a.scheduleCallEnd(func() { + ctx = common.BackgroundContext(ctx) + if slot != nil { + slot.Close(ctx) // (no timeout) + } + ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout) + call.End(ctx, err) + cancel() + }) + + handleStatsEnd(ctx, err) + return transformTimeout(err, false) + } + + // The call did not succeed. And it is retriable. We close the slot + // ASAP in the background if we haven't already done so (cold-container case), + // in order to keep latency down. + if slot != nil { + a.scheduleCallEnd(func() { + slot.Close(common.BackgroundContext(ctx)) // (no timeout) + }) + } + + handleStatsDequeue(ctx, err) + return transformTimeout(err, true) +} + func transformTimeout(e error, isRetriable bool) error { if e == context.DeadlineExceeded { if isRetriable { @@ -522,10 +553,6 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error { func (s *coldSlot) Close(ctx context.Context) error { if s.cookie != nil { - // call this from here so that in exec we don't have to eat container - // removal latency - // NOTE ensure container removal, no ctx timeout - ctx = common.BackgroundContext(ctx) s.cookie.Close(ctx) } if s.tok != nil { diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index eddadbb1a..11b8b7b9c 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -3,6 +3,7 @@ package agent import ( "bufio" "bytes" + "context" "encoding/json" "fmt" "io" @@ -391,6 +392,17 @@ func TestLoggerTooBig(t *testing.T) { logger.Close() } +type testListener struct { + afterCall func(context.Context, *models.Call) error +} + +func (l testListener) AfterCall(ctx context.Context, call *models.Call) error { + return l.afterCall(ctx, call) +} +func (l testListener) BeforeCall(context.Context, *models.Call) error { + return nil +} + func TestSubmitError(t *testing.T) { app := &models.App{Name: "myapp"} app.SetDefaults() @@ -439,6 +451,23 @@ func TestSubmitError(t *testing.T) { a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock))) defer a.Close() + var wg sync.WaitGroup + wg.Add(1) + + afterCall := func(ctx context.Context, call *models.Call) error { + defer wg.Done() + if cm.Status != "error" { + t.Fatal("expected status to be set to 'error' but was", cm.Status) + } + + if cm.Error == "" { + t.Fatal("expected error string to be set on call") + } + return nil + } + + a.AddCallListener(&testListener{afterCall: afterCall}) + callI, err := a.GetCall(FromModel(cm)) if err != nil { t.Fatal(err) @@ -449,13 +478,7 @@ func TestSubmitError(t *testing.T) { t.Fatal("expected error but got none") } - if cm.Status != "error" { - t.Fatal("expected status to be set to 'error' but was", cm.Status) - } - - if cm.Error == "" { - t.Fatal("expected error string to be set on call") - } + wg.Wait() } // this implements io.Reader, but importantly, is not a strings.Reader or diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index 77cf605cb..05c8b0181 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -8,34 +8,11 @@ import ( "github.com/sirupsen/logrus" "go.opencensus.io/trace" - "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" pool "github.com/fnproject/fn/api/runnerpool" "github.com/fnproject/fn/fnext" ) -type remoteSlot struct { - lbAgent *lbAgent -} - -func (s *remoteSlot) exec(ctx context.Context, call pool.RunnerCall) error { - a := s.lbAgent - - err := a.placer.PlaceCall(a.rp, ctx, call) - if err != nil { - logrus.WithError(err).Error("Failed to place call") - } - return err -} - -func (s *remoteSlot) Close(ctx context.Context) error { - return nil -} - -func (s *remoteSlot) Error() error { - return nil -} - type naivePlacer struct { } @@ -76,8 +53,15 @@ func (sp *naivePlacer) PlaceCall(rp pool.RunnerPool, ctx context.Context, call p if remaining <= 0 { return models.ErrCallTimeoutServerBusy } + // backoff - time.Sleep(minDuration(retryWaitInterval, remaining)) + select { + case <-ctx.Done(): + return models.ErrCallTimeoutServerBusy + case <-timeout: + return models.ErrCallTimeoutServerBusy + case <-time.After(minDuration(retryWaitInterval, remaining)): + } } } } @@ -184,27 +168,19 @@ func (a *lbAgent) Submit(callI Call) error { func (a *lbAgent) submit(ctx context.Context, call *call) error { statsEnqueue(ctx) - slot := &remoteSlot{lbAgent: a} - - defer slot.Close(ctx) // notify our slot is free once we're done - err := call.Start(ctx) if err != nil { - handleStatsDequeue(ctx, err) - return transformTimeout(err, true) + return a.handleCallEnd(ctx, call, err, false) } statsDequeueAndStart(ctx) - // pass this error (nil or otherwise) to end directly, to store status, etc - err = slot.exec(ctx, call) - handleStatsEnd(ctx, err) + err = a.placer.PlaceCall(a.rp, ctx, call) + if err != nil { + logrus.WithError(err).Error("Failed to place call") + } - // TODO: we need to allocate more time to store the call + logs in case the call timed out, - // but this could put us over the timeout if the call did not reply yet (need better policy). - ctx = common.BackgroundContext(ctx) - err = call.End(ctx, err) - return transformTimeout(err, false) + return a.handleCallEnd(ctx, call, err, true) } func (a *lbAgent) AddCallListener(cl fnext.CallListener) { @@ -215,3 +191,8 @@ func (a *lbAgent) Enqueue(context.Context, *models.Call) error { logrus.Fatal("Enqueue not implemented. Panicking.") return nil } + +func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isCommitted bool) error { + delegatedAgent := a.delegatedAgent.(*agent) + return delegatedAgent.handleCallEnd(ctx, call, nil, err, isCommitted) +} diff --git a/test/fn-api-tests/exec_test.go b/test/fn-api-tests/exec_test.go index ef7344001..84cd1fc63 100644 --- a/test/fn-api-tests/exec_test.go +++ b/test/fn-api-tests/exec_test.go @@ -314,11 +314,19 @@ func TestCanWriteLogs(t *testing.T) { } // TODO this test is redundant we have 3 tests for this? - _, err := s.Client.Operations.GetAppsAppCallsCallLog(cfg) - if err != nil { - t.Error(err.Error()) - } + retryErr := APICallWithRetry(t, 10, time.Second*2, func() (err error) { + _, err = s.Client.Operations.GetAppsAppCallsCallLog(cfg) + return err + }) + if retryErr != nil { + t.Error(retryErr.Error()) + } else { + _, err := s.Client.Operations.GetAppsAppCallsCallLog(cfg) + if err != nil { + t.Error(err.Error()) + } + } } func TestOversizedLog(t *testing.T) { @@ -353,10 +361,18 @@ func TestOversizedLog(t *testing.T) { Context: s.Context, } - logObj, err := s.Client.Operations.GetAppsAppCallsCallLog(cfg) - if err != nil { - t.Error(err.Error()) + retryErr := APICallWithRetry(t, 10, time.Second*2, func() (err error) { + _, err = s.Client.Operations.GetAppsAppCallsCallLog(cfg) + return err + }) + + if retryErr != nil { + t.Error(retryErr.Error()) } else { + logObj, err := s.Client.Operations.GetAppsAppCallsCallLog(cfg) + if err != nil { + t.Error(err.Error()) + } log := logObj.Payload.Log.Log if len(log) >= size { t.Errorf("Log entry suppose to be truncated up to expected size %v, got %v",