fn: handleCallEnd and submit improvements (#919)

* fn: move call error/end handling to handleCallEnd

This simplifies submit() function but moves the burden
of retriable-versus-committed request handling and slot.Close()
responsibility to handleCallEnd().
This commit is contained in:
Tolga Ceylan
2018-04-10 10:48:12 -07:00
committed by GitHub
parent f705fc8d8f
commit ee262901a2
4 changed files with 116 additions and 69 deletions

View File

@@ -238,39 +238,70 @@ func (a *agent) submit(ctx context.Context, call *call) error {
slot, err := a.getSlot(ctx, call)
if err != nil {
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
return a.handleCallEnd(ctx, call, slot, err, false)
}
defer slot.Close(ctx) // notify our slot is free once we're done
err = call.Start(ctx)
if err != nil {
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
return a.handleCallEnd(ctx, call, slot, err, false)
}
statsDequeueAndStart(ctx)
// pass this error (nil or otherwise) to end directly, to store status, etc
err = slot.exec(ctx, call)
handleStatsEnd(ctx, err)
a.handleCallEnd(ctx, call, err)
return transformTimeout(err, false)
return a.handleCallEnd(ctx, call, slot, err, true)
}
func (a *agent) handleCallEnd(ctx context.Context, call *call, err error) {
func (a *agent) scheduleCallEnd(fn func()) {
a.wg.Add(1)
atomic.AddInt64(&a.callEndCount, 1)
go func() {
ctx = common.BackgroundContext(ctx)
ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout)
call.End(ctx, err)
cancel()
fn()
atomic.AddInt64(&a.callEndCount, -1)
a.wg.Done()
}()
}
func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isCommitted bool) error {
// For hot-containers, slot close is a simple channel close... No need
// to handle it async. Execute it here ASAP
if slot != nil && protocol.IsStreamable(protocol.Protocol(call.Format)) {
slot.Close(ctx)
slot = nil
}
// This means call was routed (executed), in order to reduce latency here
// we perform most of these tasks in go-routine asynchronously.
if isCommitted {
a.scheduleCallEnd(func() {
ctx = common.BackgroundContext(ctx)
if slot != nil {
slot.Close(ctx) // (no timeout)
}
ctx, cancel := context.WithTimeout(ctx, a.cfg.CallEndTimeout)
call.End(ctx, err)
cancel()
})
handleStatsEnd(ctx, err)
return transformTimeout(err, false)
}
// The call did not succeed. And it is retriable. We close the slot
// ASAP in the background if we haven't already done so (cold-container case),
// in order to keep latency down.
if slot != nil {
a.scheduleCallEnd(func() {
slot.Close(common.BackgroundContext(ctx)) // (no timeout)
})
}
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
}
func transformTimeout(e error, isRetriable bool) error {
if e == context.DeadlineExceeded {
if isRetriable {
@@ -522,10 +553,6 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error {
func (s *coldSlot) Close(ctx context.Context) error {
if s.cookie != nil {
// call this from here so that in exec we don't have to eat container
// removal latency
// NOTE ensure container removal, no ctx timeout
ctx = common.BackgroundContext(ctx)
s.cookie.Close(ctx)
}
if s.tok != nil {

View File

@@ -3,6 +3,7 @@ package agent
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
@@ -391,6 +392,17 @@ func TestLoggerTooBig(t *testing.T) {
logger.Close()
}
type testListener struct {
afterCall func(context.Context, *models.Call) error
}
func (l testListener) AfterCall(ctx context.Context, call *models.Call) error {
return l.afterCall(ctx, call)
}
func (l testListener) BeforeCall(context.Context, *models.Call) error {
return nil
}
func TestSubmitError(t *testing.T) {
app := &models.App{Name: "myapp"}
app.SetDefaults()
@@ -439,6 +451,23 @@ func TestSubmitError(t *testing.T) {
a := New(NewDirectDataAccess(ds, ds, new(mqs.Mock)))
defer a.Close()
var wg sync.WaitGroup
wg.Add(1)
afterCall := func(ctx context.Context, call *models.Call) error {
defer wg.Done()
if cm.Status != "error" {
t.Fatal("expected status to be set to 'error' but was", cm.Status)
}
if cm.Error == "" {
t.Fatal("expected error string to be set on call")
}
return nil
}
a.AddCallListener(&testListener{afterCall: afterCall})
callI, err := a.GetCall(FromModel(cm))
if err != nil {
t.Fatal(err)
@@ -449,13 +478,7 @@ func TestSubmitError(t *testing.T) {
t.Fatal("expected error but got none")
}
if cm.Status != "error" {
t.Fatal("expected status to be set to 'error' but was", cm.Status)
}
if cm.Error == "" {
t.Fatal("expected error string to be set on call")
}
wg.Wait()
}
// this implements io.Reader, but importantly, is not a strings.Reader or

View File

@@ -8,34 +8,11 @@ import (
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
pool "github.com/fnproject/fn/api/runnerpool"
"github.com/fnproject/fn/fnext"
)
type remoteSlot struct {
lbAgent *lbAgent
}
func (s *remoteSlot) exec(ctx context.Context, call pool.RunnerCall) error {
a := s.lbAgent
err := a.placer.PlaceCall(a.rp, ctx, call)
if err != nil {
logrus.WithError(err).Error("Failed to place call")
}
return err
}
func (s *remoteSlot) Close(ctx context.Context) error {
return nil
}
func (s *remoteSlot) Error() error {
return nil
}
type naivePlacer struct {
}
@@ -76,8 +53,15 @@ func (sp *naivePlacer) PlaceCall(rp pool.RunnerPool, ctx context.Context, call p
if remaining <= 0 {
return models.ErrCallTimeoutServerBusy
}
// backoff
time.Sleep(minDuration(retryWaitInterval, remaining))
select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
case <-time.After(minDuration(retryWaitInterval, remaining)):
}
}
}
}
@@ -184,27 +168,19 @@ func (a *lbAgent) Submit(callI Call) error {
func (a *lbAgent) submit(ctx context.Context, call *call) error {
statsEnqueue(ctx)
slot := &remoteSlot{lbAgent: a}
defer slot.Close(ctx) // notify our slot is free once we're done
err := call.Start(ctx)
if err != nil {
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
return a.handleCallEnd(ctx, call, err, false)
}
statsDequeueAndStart(ctx)
// pass this error (nil or otherwise) to end directly, to store status, etc
err = slot.exec(ctx, call)
handleStatsEnd(ctx, err)
err = a.placer.PlaceCall(a.rp, ctx, call)
if err != nil {
logrus.WithError(err).Error("Failed to place call")
}
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
// but this could put us over the timeout if the call did not reply yet (need better policy).
ctx = common.BackgroundContext(ctx)
err = call.End(ctx, err)
return transformTimeout(err, false)
return a.handleCallEnd(ctx, call, err, true)
}
func (a *lbAgent) AddCallListener(cl fnext.CallListener) {
@@ -215,3 +191,8 @@ func (a *lbAgent) Enqueue(context.Context, *models.Call) error {
logrus.Fatal("Enqueue not implemented. Panicking.")
return nil
}
func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isCommitted bool) error {
delegatedAgent := a.delegatedAgent.(*agent)
return delegatedAgent.handleCallEnd(ctx, call, nil, err, isCommitted)
}