From 9f29d824d6312191e2ce817abb95d3f73dd42cf4 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Thu, 26 Jul 2018 10:19:25 -0700 Subject: [PATCH] fn: New timeout for LB Placer (#1137) * fn: New timeout for LB Placer Previously, LB Placers worked hard as long as client contexts allowed for. Adding a Placer config setting to bound this by 360 seconds by default. The new timeout is not accounted during actual function execution and only applies to the amount of wait time in Placers when the call is not being executed. --- api/agent/lb_agent_test.go | 12 ++- api/runnerpool/ch_placer.go | 63 +++----------- api/runnerpool/naive_placer.go | 67 +++------------ api/runnerpool/placer_config.go | 21 +++++ api/runnerpool/placer_stats.go | 2 + api/runnerpool/placer_tracker.go | 125 ++++++++++++++++++++++++++++ api/server/server.go | 5 +- test/fn-system-tests/system_test.go | 3 +- 8 files changed, 185 insertions(+), 113 deletions(-) create mode 100644 api/runnerpool/placer_config.go create mode 100644 api/runnerpool/placer_tracker.go diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index fc0319c99..86fb24dca 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -158,7 +158,8 @@ func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxC } func TestOneRunner(t *testing.T) { - placer := pool.NewNaivePlacer() + cfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) call := &mockRunnerCall{} ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) @@ -170,7 +171,8 @@ func TestOneRunner(t *testing.T) { } func TestEnforceTimeoutFromContext(t *testing.T) { - placer := pool.NewNaivePlacer() + cfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) call := &mockRunnerCall{} ctx, cancel := context.WithDeadline(context.Background(), time.Now()) @@ -182,7 +184,8 @@ func TestEnforceTimeoutFromContext(t *testing.T) { } func TestRRRunner(t *testing.T) { - placer := pool.NewNaivePlacer() + cfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2) parallelCalls := 2 @@ -215,7 +218,8 @@ func TestRRRunner(t *testing.T) { } func TestEnforceLbTimeout(t *testing.T) { - placer := pool.NewNaivePlacer() + cfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 1) parallelCalls := 5 diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index 45fe91345..ac7269ca8 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -5,24 +5,21 @@ package runnerpool import ( "context" - "time" - "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" "github.com/dchest/siphash" "github.com/sirupsen/logrus" - "go.opencensus.io/stats" ) type chPlacer struct { - rrInterval time.Duration + cfg PlacerConfig } -func NewCHPlacer() Placer { - logrus.Info("Creating new CH runnerpool placer") +func NewCHPlacer(cfg *PlacerConfig) Placer { + logrus.Infof("Creating new CH runnerpool placer with config=%+v", cfg) return &chPlacer{ - rrInterval: 10 * time.Millisecond, + cfg: *cfg, } } @@ -31,76 +28,38 @@ func NewCHPlacer() Placer { // the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it. func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { - tracker := newAttemptTracker(ctx) - log := common.Logger(ctx) + state := NewPlacerTracker(ctx, &p.cfg) + defer state.HandleDone() // The key is just the path in this case key := call.Model().Path sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) -OutTries: for { runners, err := rp.Runners(call) if err != nil { - log.WithError(err).Error("Failed to find runners for call") - stats.Record(ctx, errorPoolCountMeasure.M(0)) - tracker.finalizeAttempts(false) + state.HandleFindRunnersFailure(err) return err } i := int(jumpConsistentHash(sum64, int32(len(runners)))) - for j := 0; j < len(runners); j++ { - if ctx.Err() != nil { - break OutTries - } + for j := 0; j < len(runners) && !state.IsDone(); j++ { r := runners[i] - tracker.recordAttempt() - tryCtx, tryCancel := context.WithCancel(ctx) - placed, err := r.TryExec(tryCtx, call) - tryCancel() - - // Only log unusual (except for too-busy) errors - if err != nil && err != models.ErrCallTimeoutServerBusy { - log.WithError(err).Errorf("Failed during call placement, placed=%v", placed) - } - + placed, err := state.TryRunner(r, call) if placed { - if err != nil { - stats.Record(ctx, placedErrorCountMeasure.M(0)) - } else { - stats.Record(ctx, placedOKCountMeasure.M(0)) - } - tracker.finalizeAttempts(true) return err } i = (i + 1) % len(runners) - - // Too Busy is super common case, we track it separately - if err == models.ErrCallTimeoutServerBusy { - stats.Record(ctx, retryTooBusyCountMeasure.M(0)) - } else { - stats.Record(ctx, retryErrorCountMeasure.M(0)) - } } - if len(runners) == 0 { - stats.Record(ctx, emptyPoolCountMeasure.M(0)) - } - - // backoff - select { - case <-ctx.Done(): - break OutTries - case <-time.After(p.rrInterval): + if !state.RetryAllBackoff(len(runners)) { + break } } - // Cancel Exit Path / Client cancelled/timedout - stats.Record(ctx, cancelCountMeasure.M(0)) - tracker.finalizeAttempts(false) return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index ae95d23f9..c1c5631bc 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -5,92 +5,51 @@ import ( "sync/atomic" "time" - "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" - "go.opencensus.io/stats" ) type naivePlacer struct { - rrInterval time.Duration - rrIndex uint64 + cfg PlacerConfig + rrIndex uint64 } -func NewNaivePlacer() Placer { - rrIndex := uint64(time.Now().Nanosecond()) - logrus.Infof("Creating new naive runnerpool placer rrIndex=%d", rrIndex) +func NewNaivePlacer(cfg *PlacerConfig) Placer { + logrus.Infof("Creating new naive runnerpool placer with config=%+v", cfg) return &naivePlacer{ - rrInterval: 10 * time.Millisecond, - rrIndex: rrIndex, + cfg: *cfg, + rrIndex: uint64(time.Now().Nanosecond()), } } func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { - tracker := newAttemptTracker(ctx) - log := common.Logger(ctx) + state := NewPlacerTracker(ctx, &sp.cfg) + defer state.HandleDone() -OutTries: for { runners, err := rp.Runners(call) if err != nil { - log.WithError(err).Error("Failed to find runners for call") - stats.Record(ctx, errorPoolCountMeasure.M(0)) - tracker.finalizeAttempts(false) + state.HandleFindRunnersFailure(err) return err } - for j := 0; j < len(runners); j++ { - if ctx.Err() != nil { - break OutTries - } + for j := 0; j < len(runners) && !state.IsDone(); j++ { i := atomic.AddUint64(&sp.rrIndex, uint64(1)) r := runners[int(i)%len(runners)] - tracker.recordAttempt() - tryCtx, tryCancel := context.WithCancel(ctx) - placed, err := r.TryExec(tryCtx, call) - tryCancel() - - // Only log unusual (except for too-busy) errors - if err != nil && err != models.ErrCallTimeoutServerBusy { - log.WithError(err).Errorf("Failed during call placement, placed=%v", placed) - } - + placed, err := state.TryRunner(r, call) if placed { - if err != nil { - stats.Record(ctx, placedErrorCountMeasure.M(0)) - } else { - stats.Record(ctx, placedOKCountMeasure.M(0)) - } - tracker.finalizeAttempts(true) return err } - - // Too Busy is super common case, we track it separately - if err == models.ErrCallTimeoutServerBusy { - stats.Record(ctx, retryTooBusyCountMeasure.M(0)) - } else { - stats.Record(ctx, retryErrorCountMeasure.M(0)) - } } - if len(runners) == 0 { - stats.Record(ctx, emptyPoolCountMeasure.M(0)) - } - - // backoff - select { - case <-ctx.Done(): - break OutTries - case <-time.After(sp.rrInterval): + if !state.RetryAllBackoff(len(runners)) { + break } } - // Cancel Exit Path / Client cancelled/timedout - stats.Record(ctx, cancelCountMeasure.M(0)) - tracker.finalizeAttempts(false) return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/placer_config.go b/api/runnerpool/placer_config.go new file mode 100644 index 000000000..77346f3b5 --- /dev/null +++ b/api/runnerpool/placer_config.go @@ -0,0 +1,21 @@ +package runnerpool + +import ( + "time" +) + +// Common config for placers. +type PlacerConfig struct { + // After all runners in the runner list is tried, apply a delay before retrying. + RetryAllDelay time.Duration `json:"retry_all_delay"` + + // Maximum amount of time a placer can hold a request during runner attempts + PlacerTimeout time.Duration `json:"placer_timeout"` +} + +func NewPlacerConfig() PlacerConfig { + return PlacerConfig{ + RetryAllDelay: 10 * time.Millisecond, + PlacerTimeout: 360 * time.Second, + } +} diff --git a/api/runnerpool/placer_stats.go b/api/runnerpool/placer_stats.go index e9705dc61..75a233486 100644 --- a/api/runnerpool/placer_stats.go +++ b/api/runnerpool/placer_stats.go @@ -16,6 +16,7 @@ var ( errorPoolCountMeasure = stats.Int64("lb_placer_rp_error_count", "LB Placer RunnerPool RunnerList Error Count", "") emptyPoolCountMeasure = stats.Int64("lb_placer_rp_empty_count", "LB Placer RunnerPool RunnerList Empty Count", "") cancelCountMeasure = stats.Int64("lb_placer_client_cancelled_count", "LB Placer Client Cancel Count", "") + placerTimeoutMeasure = stats.Int64("lb_placer_timeout_count", "LB Placer Timeout Count", "") placedErrorCountMeasure = stats.Int64("lb_placer_placed_error_count", "LB Placer Placed Call Count With Errors", "") placedOKCountMeasure = stats.Int64("lb_placer_placed_ok_count", "LB Placer Placed Call Count Without Errors", "") retryTooBusyCountMeasure = stats.Int64("lb_placer_retry_busy_count", "LB Placer Retry Count - Too Busy", "") @@ -91,6 +92,7 @@ func RegisterPlacerViews(tagKeys []string) { createView(errorPoolCountMeasure, view.Count(), tagKeys), createView(emptyPoolCountMeasure, view.Count(), tagKeys), createView(cancelCountMeasure, view.Count(), tagKeys), + createView(placerTimeoutMeasure, view.Count(), tagKeys), createView(placedErrorCountMeasure, view.Count(), tagKeys), createView(placedOKCountMeasure, view.Count(), tagKeys), createView(retryTooBusyCountMeasure, view.Count(), tagKeys), diff --git a/api/runnerpool/placer_tracker.go b/api/runnerpool/placer_tracker.go new file mode 100644 index 000000000..55a292db6 --- /dev/null +++ b/api/runnerpool/placer_tracker.go @@ -0,0 +1,125 @@ +package runnerpool + +import ( + "context" + "time" + + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/models" + + "go.opencensus.io/stats" +) + +type placerTracker struct { + cfg *PlacerConfig + requestCtx context.Context + placerCtx context.Context + cancel context.CancelFunc + tracker *attemptTracker + isPlaced bool +} + +func NewPlacerTracker(requestCtx context.Context, cfg *PlacerConfig) *placerTracker { + ctx, cancel := context.WithTimeout(context.Background(), cfg.PlacerTimeout) + return &placerTracker{ + cfg: cfg, + requestCtx: requestCtx, + placerCtx: ctx, + cancel: cancel, + tracker: newAttemptTracker(requestCtx), + } +} + +// IsDone is a non-blocking check to see if the underlying deadlines are exceeded. +func (tr *placerTracker) IsDone() bool { + return tr.requestCtx.Err() != nil || tr.placerCtx.Err() != nil +} + +// HandleFindRunnersFailure is a convenience function to record error from runnerpool.Runners() +func (tr *placerTracker) HandleFindRunnersFailure(err error) { + common.Logger(tr.requestCtx).WithError(err).Error("Failed to find runners for call") + stats.Record(tr.requestCtx, errorPoolCountMeasure.M(0)) +} + +// TryRunner is a convenience function to TryExec a call on a runner and +// analyze the results. +func (tr *placerTracker) TryRunner(r Runner, call RunnerCall) (bool, error) { + tr.tracker.recordAttempt() + + // WARNING: Do not use placerCtx here to let requestCtx take its time + // during container execution. + ctx, cancel := context.WithCancel(tr.requestCtx) + isPlaced, err := r.TryExec(ctx, call) + cancel() + + if !isPlaced { + + // Too Busy is super common case, we track it separately + if err == models.ErrCallTimeoutServerBusy { + stats.Record(tr.requestCtx, retryTooBusyCountMeasure.M(0)) + } else { + stats.Record(tr.requestCtx, retryErrorCountMeasure.M(0)) + } + + } else { + + // Only log unusual (except for too-busy) errors for isPlaced (customer impacting) calls + if err != nil && err != models.ErrCallTimeoutServerBusy { + logger := common.Logger(ctx).WithField("runner_addr", r.Address()) + logger.WithError(err).Errorf("Failed during call placement") + } + + if err != nil { + stats.Record(tr.requestCtx, placedErrorCountMeasure.M(0)) + } else { + stats.Record(tr.requestCtx, placedOKCountMeasure.M(0)) + } + + // Call is now committed. In other words, it was 'run'. We are done. + tr.isPlaced = true + } + + return isPlaced, err +} + +// HandleDone is cleanup function to cancel pending contexts and to +// record stats for the placement session. +func (tr *placerTracker) HandleDone() { + + // Cancel Exit Path / Client cancelled/timedout + if tr.requestCtx.Err() != nil { + stats.Record(tr.requestCtx, cancelCountMeasure.M(0)) + } + + // This means our placer timed out. We ignore tr.isPlaced calls + // since we do not check/track placer ctx timeout if a call was + // actually ran on a runner. This means, placer timeout can be + // 10 secs, but a call can execute for 60 secs in a container. + if !tr.isPlaced && tr.placerCtx.Err() != nil { + stats.Record(tr.requestCtx, placerTimeoutMeasure.M(0)) + } + + tr.tracker.finalizeAttempts(tr.isPlaced) + tr.cancel() +} + +// RetryAllBackoff blocks until it is time to try the runner list again. Returns +// false if the placer should stop trying. +func (tr *placerTracker) RetryAllBackoff(numOfRunners int) bool { + + // This means Placer is operating on an empty list. No runners + // available. Record it. + if numOfRunners == 0 { + stats.Record(tr.requestCtx, emptyPoolCountMeasure.M(0)) + } + + select { + case <-tr.requestCtx.Done(): // client side timeout/cancel + return false + case <-tr.placerCtx.Done(): // placer wait timeout + return false + case <-time.After(tr.cfg.RetryAllDelay): + } + + return true +} diff --git a/api/server/server.go b/api/server/server.go index eddda341e..2b407103e 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -586,12 +586,13 @@ func WithAgentFromEnv() Option { } // Select the placement algorithm + placerCfg := pool.NewPlacerConfig() var placer pool.Placer switch getEnv(EnvLBPlacementAlg, "") { case "ch": - placer = pool.NewCHPlacer() + placer = pool.NewCHPlacer(&placerCfg) default: - placer = pool.NewNaivePlacer() + placer = pool.NewNaivePlacer(&placerCfg) } keys := []string{"fn_appname", "fn_path"} diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index 5538c421a..62a4e7884 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -217,7 +217,8 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { if err != nil { return nil, err } - placer := pool.NewNaivePlacer() + placerCfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&placerCfg) keys := []string{"fn_appname", "fn_path"} pool.RegisterPlacerViews(keys)