diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index fc0319c99..86fb24dca 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -158,7 +158,8 @@ func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxC } func TestOneRunner(t *testing.T) { - placer := pool.NewNaivePlacer() + cfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) call := &mockRunnerCall{} ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) @@ -170,7 +171,8 @@ func TestOneRunner(t *testing.T) { } func TestEnforceTimeoutFromContext(t *testing.T) { - placer := pool.NewNaivePlacer() + cfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) call := &mockRunnerCall{} ctx, cancel := context.WithDeadline(context.Background(), time.Now()) @@ -182,7 +184,8 @@ func TestEnforceTimeoutFromContext(t *testing.T) { } func TestRRRunner(t *testing.T) { - placer := pool.NewNaivePlacer() + cfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2) parallelCalls := 2 @@ -215,7 +218,8 @@ func TestRRRunner(t *testing.T) { } func TestEnforceLbTimeout(t *testing.T) { - placer := pool.NewNaivePlacer() + cfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&cfg) rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 1) parallelCalls := 5 diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index 45fe91345..ac7269ca8 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -5,24 +5,21 @@ package runnerpool import ( "context" - "time" - "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" "github.com/dchest/siphash" "github.com/sirupsen/logrus" - "go.opencensus.io/stats" ) type chPlacer struct { - rrInterval time.Duration + cfg PlacerConfig } -func NewCHPlacer() Placer { - logrus.Info("Creating new CH runnerpool placer") +func NewCHPlacer(cfg *PlacerConfig) Placer { + logrus.Infof("Creating new CH runnerpool placer with config=%+v", cfg) return &chPlacer{ - rrInterval: 10 * time.Millisecond, + cfg: *cfg, } } @@ -31,76 +28,38 @@ func NewCHPlacer() Placer { // the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it. func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { - tracker := newAttemptTracker(ctx) - log := common.Logger(ctx) + state := NewPlacerTracker(ctx, &p.cfg) + defer state.HandleDone() // The key is just the path in this case key := call.Model().Path sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) -OutTries: for { runners, err := rp.Runners(call) if err != nil { - log.WithError(err).Error("Failed to find runners for call") - stats.Record(ctx, errorPoolCountMeasure.M(0)) - tracker.finalizeAttempts(false) + state.HandleFindRunnersFailure(err) return err } i := int(jumpConsistentHash(sum64, int32(len(runners)))) - for j := 0; j < len(runners); j++ { - if ctx.Err() != nil { - break OutTries - } + for j := 0; j < len(runners) && !state.IsDone(); j++ { r := runners[i] - tracker.recordAttempt() - tryCtx, tryCancel := context.WithCancel(ctx) - placed, err := r.TryExec(tryCtx, call) - tryCancel() - - // Only log unusual (except for too-busy) errors - if err != nil && err != models.ErrCallTimeoutServerBusy { - log.WithError(err).Errorf("Failed during call placement, placed=%v", placed) - } - + placed, err := state.TryRunner(r, call) if placed { - if err != nil { - stats.Record(ctx, placedErrorCountMeasure.M(0)) - } else { - stats.Record(ctx, placedOKCountMeasure.M(0)) - } - tracker.finalizeAttempts(true) return err } i = (i + 1) % len(runners) - - // Too Busy is super common case, we track it separately - if err == models.ErrCallTimeoutServerBusy { - stats.Record(ctx, retryTooBusyCountMeasure.M(0)) - } else { - stats.Record(ctx, retryErrorCountMeasure.M(0)) - } } - if len(runners) == 0 { - stats.Record(ctx, emptyPoolCountMeasure.M(0)) - } - - // backoff - select { - case <-ctx.Done(): - break OutTries - case <-time.After(p.rrInterval): + if !state.RetryAllBackoff(len(runners)) { + break } } - // Cancel Exit Path / Client cancelled/timedout - stats.Record(ctx, cancelCountMeasure.M(0)) - tracker.finalizeAttempts(false) return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index ae95d23f9..c1c5631bc 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -5,92 +5,51 @@ import ( "sync/atomic" "time" - "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" - "go.opencensus.io/stats" ) type naivePlacer struct { - rrInterval time.Duration - rrIndex uint64 + cfg PlacerConfig + rrIndex uint64 } -func NewNaivePlacer() Placer { - rrIndex := uint64(time.Now().Nanosecond()) - logrus.Infof("Creating new naive runnerpool placer rrIndex=%d", rrIndex) +func NewNaivePlacer(cfg *PlacerConfig) Placer { + logrus.Infof("Creating new naive runnerpool placer with config=%+v", cfg) return &naivePlacer{ - rrInterval: 10 * time.Millisecond, - rrIndex: rrIndex, + cfg: *cfg, + rrIndex: uint64(time.Now().Nanosecond()), } } func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { - tracker := newAttemptTracker(ctx) - log := common.Logger(ctx) + state := NewPlacerTracker(ctx, &sp.cfg) + defer state.HandleDone() -OutTries: for { runners, err := rp.Runners(call) if err != nil { - log.WithError(err).Error("Failed to find runners for call") - stats.Record(ctx, errorPoolCountMeasure.M(0)) - tracker.finalizeAttempts(false) + state.HandleFindRunnersFailure(err) return err } - for j := 0; j < len(runners); j++ { - if ctx.Err() != nil { - break OutTries - } + for j := 0; j < len(runners) && !state.IsDone(); j++ { i := atomic.AddUint64(&sp.rrIndex, uint64(1)) r := runners[int(i)%len(runners)] - tracker.recordAttempt() - tryCtx, tryCancel := context.WithCancel(ctx) - placed, err := r.TryExec(tryCtx, call) - tryCancel() - - // Only log unusual (except for too-busy) errors - if err != nil && err != models.ErrCallTimeoutServerBusy { - log.WithError(err).Errorf("Failed during call placement, placed=%v", placed) - } - + placed, err := state.TryRunner(r, call) if placed { - if err != nil { - stats.Record(ctx, placedErrorCountMeasure.M(0)) - } else { - stats.Record(ctx, placedOKCountMeasure.M(0)) - } - tracker.finalizeAttempts(true) return err } - - // Too Busy is super common case, we track it separately - if err == models.ErrCallTimeoutServerBusy { - stats.Record(ctx, retryTooBusyCountMeasure.M(0)) - } else { - stats.Record(ctx, retryErrorCountMeasure.M(0)) - } } - if len(runners) == 0 { - stats.Record(ctx, emptyPoolCountMeasure.M(0)) - } - - // backoff - select { - case <-ctx.Done(): - break OutTries - case <-time.After(sp.rrInterval): + if !state.RetryAllBackoff(len(runners)) { + break } } - // Cancel Exit Path / Client cancelled/timedout - stats.Record(ctx, cancelCountMeasure.M(0)) - tracker.finalizeAttempts(false) return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/placer_config.go b/api/runnerpool/placer_config.go new file mode 100644 index 000000000..77346f3b5 --- /dev/null +++ b/api/runnerpool/placer_config.go @@ -0,0 +1,21 @@ +package runnerpool + +import ( + "time" +) + +// Common config for placers. +type PlacerConfig struct { + // After all runners in the runner list is tried, apply a delay before retrying. + RetryAllDelay time.Duration `json:"retry_all_delay"` + + // Maximum amount of time a placer can hold a request during runner attempts + PlacerTimeout time.Duration `json:"placer_timeout"` +} + +func NewPlacerConfig() PlacerConfig { + return PlacerConfig{ + RetryAllDelay: 10 * time.Millisecond, + PlacerTimeout: 360 * time.Second, + } +} diff --git a/api/runnerpool/placer_stats.go b/api/runnerpool/placer_stats.go index e9705dc61..75a233486 100644 --- a/api/runnerpool/placer_stats.go +++ b/api/runnerpool/placer_stats.go @@ -16,6 +16,7 @@ var ( errorPoolCountMeasure = stats.Int64("lb_placer_rp_error_count", "LB Placer RunnerPool RunnerList Error Count", "") emptyPoolCountMeasure = stats.Int64("lb_placer_rp_empty_count", "LB Placer RunnerPool RunnerList Empty Count", "") cancelCountMeasure = stats.Int64("lb_placer_client_cancelled_count", "LB Placer Client Cancel Count", "") + placerTimeoutMeasure = stats.Int64("lb_placer_timeout_count", "LB Placer Timeout Count", "") placedErrorCountMeasure = stats.Int64("lb_placer_placed_error_count", "LB Placer Placed Call Count With Errors", "") placedOKCountMeasure = stats.Int64("lb_placer_placed_ok_count", "LB Placer Placed Call Count Without Errors", "") retryTooBusyCountMeasure = stats.Int64("lb_placer_retry_busy_count", "LB Placer Retry Count - Too Busy", "") @@ -91,6 +92,7 @@ func RegisterPlacerViews(tagKeys []string) { createView(errorPoolCountMeasure, view.Count(), tagKeys), createView(emptyPoolCountMeasure, view.Count(), tagKeys), createView(cancelCountMeasure, view.Count(), tagKeys), + createView(placerTimeoutMeasure, view.Count(), tagKeys), createView(placedErrorCountMeasure, view.Count(), tagKeys), createView(placedOKCountMeasure, view.Count(), tagKeys), createView(retryTooBusyCountMeasure, view.Count(), tagKeys), diff --git a/api/runnerpool/placer_tracker.go b/api/runnerpool/placer_tracker.go new file mode 100644 index 000000000..55a292db6 --- /dev/null +++ b/api/runnerpool/placer_tracker.go @@ -0,0 +1,125 @@ +package runnerpool + +import ( + "context" + "time" + + "github.com/fnproject/fn/api/common" + "github.com/fnproject/fn/api/models" + + "go.opencensus.io/stats" +) + +type placerTracker struct { + cfg *PlacerConfig + requestCtx context.Context + placerCtx context.Context + cancel context.CancelFunc + tracker *attemptTracker + isPlaced bool +} + +func NewPlacerTracker(requestCtx context.Context, cfg *PlacerConfig) *placerTracker { + ctx, cancel := context.WithTimeout(context.Background(), cfg.PlacerTimeout) + return &placerTracker{ + cfg: cfg, + requestCtx: requestCtx, + placerCtx: ctx, + cancel: cancel, + tracker: newAttemptTracker(requestCtx), + } +} + +// IsDone is a non-blocking check to see if the underlying deadlines are exceeded. +func (tr *placerTracker) IsDone() bool { + return tr.requestCtx.Err() != nil || tr.placerCtx.Err() != nil +} + +// HandleFindRunnersFailure is a convenience function to record error from runnerpool.Runners() +func (tr *placerTracker) HandleFindRunnersFailure(err error) { + common.Logger(tr.requestCtx).WithError(err).Error("Failed to find runners for call") + stats.Record(tr.requestCtx, errorPoolCountMeasure.M(0)) +} + +// TryRunner is a convenience function to TryExec a call on a runner and +// analyze the results. +func (tr *placerTracker) TryRunner(r Runner, call RunnerCall) (bool, error) { + tr.tracker.recordAttempt() + + // WARNING: Do not use placerCtx here to let requestCtx take its time + // during container execution. + ctx, cancel := context.WithCancel(tr.requestCtx) + isPlaced, err := r.TryExec(ctx, call) + cancel() + + if !isPlaced { + + // Too Busy is super common case, we track it separately + if err == models.ErrCallTimeoutServerBusy { + stats.Record(tr.requestCtx, retryTooBusyCountMeasure.M(0)) + } else { + stats.Record(tr.requestCtx, retryErrorCountMeasure.M(0)) + } + + } else { + + // Only log unusual (except for too-busy) errors for isPlaced (customer impacting) calls + if err != nil && err != models.ErrCallTimeoutServerBusy { + logger := common.Logger(ctx).WithField("runner_addr", r.Address()) + logger.WithError(err).Errorf("Failed during call placement") + } + + if err != nil { + stats.Record(tr.requestCtx, placedErrorCountMeasure.M(0)) + } else { + stats.Record(tr.requestCtx, placedOKCountMeasure.M(0)) + } + + // Call is now committed. In other words, it was 'run'. We are done. + tr.isPlaced = true + } + + return isPlaced, err +} + +// HandleDone is cleanup function to cancel pending contexts and to +// record stats for the placement session. +func (tr *placerTracker) HandleDone() { + + // Cancel Exit Path / Client cancelled/timedout + if tr.requestCtx.Err() != nil { + stats.Record(tr.requestCtx, cancelCountMeasure.M(0)) + } + + // This means our placer timed out. We ignore tr.isPlaced calls + // since we do not check/track placer ctx timeout if a call was + // actually ran on a runner. This means, placer timeout can be + // 10 secs, but a call can execute for 60 secs in a container. + if !tr.isPlaced && tr.placerCtx.Err() != nil { + stats.Record(tr.requestCtx, placerTimeoutMeasure.M(0)) + } + + tr.tracker.finalizeAttempts(tr.isPlaced) + tr.cancel() +} + +// RetryAllBackoff blocks until it is time to try the runner list again. Returns +// false if the placer should stop trying. +func (tr *placerTracker) RetryAllBackoff(numOfRunners int) bool { + + // This means Placer is operating on an empty list. No runners + // available. Record it. + if numOfRunners == 0 { + stats.Record(tr.requestCtx, emptyPoolCountMeasure.M(0)) + } + + select { + case <-tr.requestCtx.Done(): // client side timeout/cancel + return false + case <-tr.placerCtx.Done(): // placer wait timeout + return false + case <-time.After(tr.cfg.RetryAllDelay): + } + + return true +} diff --git a/api/server/server.go b/api/server/server.go index eddda341e..2b407103e 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -586,12 +586,13 @@ func WithAgentFromEnv() Option { } // Select the placement algorithm + placerCfg := pool.NewPlacerConfig() var placer pool.Placer switch getEnv(EnvLBPlacementAlg, "") { case "ch": - placer = pool.NewCHPlacer() + placer = pool.NewCHPlacer(&placerCfg) default: - placer = pool.NewNaivePlacer() + placer = pool.NewNaivePlacer(&placerCfg) } keys := []string{"fn_appname", "fn_path"} diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index 5538c421a..62a4e7884 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -217,7 +217,8 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { if err != nil { return nil, err } - placer := pool.NewNaivePlacer() + placerCfg := pool.NewPlacerConfig() + placer := pool.NewNaivePlacer(&placerCfg) keys := []string{"fn_appname", "fn_path"} pool.RegisterPlacerViews(keys)