fn: New timeout for LB Placer (#1137)

* fn: New timeout for LB Placer

Previously, LB Placers worked hard as long as
client contexts allowed for. Adding a Placer
config setting to bound this by 360 seconds by
default.

The new timeout is not accounted during actual
function execution and only applies to the amount
of wait time in Placers when the call is not
being executed.
This commit is contained in:
Tolga Ceylan
2018-07-26 10:19:25 -07:00
committed by GitHub
parent f7266c4f19
commit 9f29d824d6
8 changed files with 185 additions and 113 deletions

View File

@@ -158,7 +158,8 @@ func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxC
} }
func TestOneRunner(t *testing.T) { func TestOneRunner(t *testing.T) {
placer := pool.NewNaivePlacer() cfg := pool.NewPlacerConfig()
placer := pool.NewNaivePlacer(&cfg)
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
call := &mockRunnerCall{} call := &mockRunnerCall{}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second)) ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
@@ -170,7 +171,8 @@ func TestOneRunner(t *testing.T) {
} }
func TestEnforceTimeoutFromContext(t *testing.T) { func TestEnforceTimeoutFromContext(t *testing.T) {
placer := pool.NewNaivePlacer() cfg := pool.NewPlacerConfig()
placer := pool.NewNaivePlacer(&cfg)
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5) rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
call := &mockRunnerCall{} call := &mockRunnerCall{}
ctx, cancel := context.WithDeadline(context.Background(), time.Now()) ctx, cancel := context.WithDeadline(context.Background(), time.Now())
@@ -182,7 +184,8 @@ func TestEnforceTimeoutFromContext(t *testing.T) {
} }
func TestRRRunner(t *testing.T) { func TestRRRunner(t *testing.T) {
placer := pool.NewNaivePlacer() cfg := pool.NewPlacerConfig()
placer := pool.NewNaivePlacer(&cfg)
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2) rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2)
parallelCalls := 2 parallelCalls := 2
@@ -215,7 +218,8 @@ func TestRRRunner(t *testing.T) {
} }
func TestEnforceLbTimeout(t *testing.T) { func TestEnforceLbTimeout(t *testing.T) {
placer := pool.NewNaivePlacer() cfg := pool.NewPlacerConfig()
placer := pool.NewNaivePlacer(&cfg)
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 1) rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 1)
parallelCalls := 5 parallelCalls := 5

View File

@@ -5,24 +5,21 @@ package runnerpool
import ( import (
"context" "context"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/models"
"github.com/dchest/siphash" "github.com/dchest/siphash"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"go.opencensus.io/stats"
) )
type chPlacer struct { type chPlacer struct {
rrInterval time.Duration cfg PlacerConfig
} }
func NewCHPlacer() Placer { func NewCHPlacer(cfg *PlacerConfig) Placer {
logrus.Info("Creating new CH runnerpool placer") logrus.Infof("Creating new CH runnerpool placer with config=%+v", cfg)
return &chPlacer{ return &chPlacer{
rrInterval: 10 * time.Millisecond, cfg: *cfg,
} }
} }
@@ -31,76 +28,38 @@ func NewCHPlacer() Placer {
// the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it. // the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it.
func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
tracker := newAttemptTracker(ctx) state := NewPlacerTracker(ctx, &p.cfg)
log := common.Logger(ctx) defer state.HandleDone()
// The key is just the path in this case // The key is just the path in this case
key := call.Model().Path key := call.Model().Path
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
OutTries:
for { for {
runners, err := rp.Runners(call) runners, err := rp.Runners(call)
if err != nil { if err != nil {
log.WithError(err).Error("Failed to find runners for call") state.HandleFindRunnersFailure(err)
stats.Record(ctx, errorPoolCountMeasure.M(0))
tracker.finalizeAttempts(false)
return err return err
} }
i := int(jumpConsistentHash(sum64, int32(len(runners)))) i := int(jumpConsistentHash(sum64, int32(len(runners))))
for j := 0; j < len(runners); j++ { for j := 0; j < len(runners) && !state.IsDone(); j++ {
if ctx.Err() != nil {
break OutTries
}
r := runners[i] r := runners[i]
tracker.recordAttempt() placed, err := state.TryRunner(r, call)
tryCtx, tryCancel := context.WithCancel(ctx)
placed, err := r.TryExec(tryCtx, call)
tryCancel()
// Only log unusual (except for too-busy) errors
if err != nil && err != models.ErrCallTimeoutServerBusy {
log.WithError(err).Errorf("Failed during call placement, placed=%v", placed)
}
if placed { if placed {
if err != nil {
stats.Record(ctx, placedErrorCountMeasure.M(0))
} else {
stats.Record(ctx, placedOKCountMeasure.M(0))
}
tracker.finalizeAttempts(true)
return err return err
} }
i = (i + 1) % len(runners) i = (i + 1) % len(runners)
}
// Too Busy is super common case, we track it separately if !state.RetryAllBackoff(len(runners)) {
if err == models.ErrCallTimeoutServerBusy { break
stats.Record(ctx, retryTooBusyCountMeasure.M(0))
} else {
stats.Record(ctx, retryErrorCountMeasure.M(0))
} }
} }
if len(runners) == 0 {
stats.Record(ctx, emptyPoolCountMeasure.M(0))
}
// backoff
select {
case <-ctx.Done():
break OutTries
case <-time.After(p.rrInterval):
}
}
// Cancel Exit Path / Client cancelled/timedout
stats.Record(ctx, cancelCountMeasure.M(0))
tracker.finalizeAttempts(false)
return models.ErrCallTimeoutServerBusy return models.ErrCallTimeoutServerBusy
} }

View File

@@ -5,92 +5,51 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models" "github.com/fnproject/fn/api/models"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"go.opencensus.io/stats"
) )
type naivePlacer struct { type naivePlacer struct {
rrInterval time.Duration cfg PlacerConfig
rrIndex uint64 rrIndex uint64
} }
func NewNaivePlacer() Placer { func NewNaivePlacer(cfg *PlacerConfig) Placer {
rrIndex := uint64(time.Now().Nanosecond()) logrus.Infof("Creating new naive runnerpool placer with config=%+v", cfg)
logrus.Infof("Creating new naive runnerpool placer rrIndex=%d", rrIndex)
return &naivePlacer{ return &naivePlacer{
rrInterval: 10 * time.Millisecond, cfg: *cfg,
rrIndex: rrIndex, rrIndex: uint64(time.Now().Nanosecond()),
} }
} }
func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
tracker := newAttemptTracker(ctx) state := NewPlacerTracker(ctx, &sp.cfg)
log := common.Logger(ctx) defer state.HandleDone()
OutTries:
for { for {
runners, err := rp.Runners(call) runners, err := rp.Runners(call)
if err != nil { if err != nil {
log.WithError(err).Error("Failed to find runners for call") state.HandleFindRunnersFailure(err)
stats.Record(ctx, errorPoolCountMeasure.M(0))
tracker.finalizeAttempts(false)
return err return err
} }
for j := 0; j < len(runners); j++ { for j := 0; j < len(runners) && !state.IsDone(); j++ {
if ctx.Err() != nil {
break OutTries
}
i := atomic.AddUint64(&sp.rrIndex, uint64(1)) i := atomic.AddUint64(&sp.rrIndex, uint64(1))
r := runners[int(i)%len(runners)] r := runners[int(i)%len(runners)]
tracker.recordAttempt() placed, err := state.TryRunner(r, call)
tryCtx, tryCancel := context.WithCancel(ctx)
placed, err := r.TryExec(tryCtx, call)
tryCancel()
// Only log unusual (except for too-busy) errors
if err != nil && err != models.ErrCallTimeoutServerBusy {
log.WithError(err).Errorf("Failed during call placement, placed=%v", placed)
}
if placed { if placed {
if err != nil {
stats.Record(ctx, placedErrorCountMeasure.M(0))
} else {
stats.Record(ctx, placedOKCountMeasure.M(0))
}
tracker.finalizeAttempts(true)
return err return err
} }
}
// Too Busy is super common case, we track it separately if !state.RetryAllBackoff(len(runners)) {
if err == models.ErrCallTimeoutServerBusy { break
stats.Record(ctx, retryTooBusyCountMeasure.M(0))
} else {
stats.Record(ctx, retryErrorCountMeasure.M(0))
} }
} }
if len(runners) == 0 {
stats.Record(ctx, emptyPoolCountMeasure.M(0))
}
// backoff
select {
case <-ctx.Done():
break OutTries
case <-time.After(sp.rrInterval):
}
}
// Cancel Exit Path / Client cancelled/timedout
stats.Record(ctx, cancelCountMeasure.M(0))
tracker.finalizeAttempts(false)
return models.ErrCallTimeoutServerBusy return models.ErrCallTimeoutServerBusy
} }

View File

@@ -0,0 +1,21 @@
package runnerpool
import (
"time"
)
// Common config for placers.
type PlacerConfig struct {
// After all runners in the runner list is tried, apply a delay before retrying.
RetryAllDelay time.Duration `json:"retry_all_delay"`
// Maximum amount of time a placer can hold a request during runner attempts
PlacerTimeout time.Duration `json:"placer_timeout"`
}
func NewPlacerConfig() PlacerConfig {
return PlacerConfig{
RetryAllDelay: 10 * time.Millisecond,
PlacerTimeout: 360 * time.Second,
}
}

View File

@@ -16,6 +16,7 @@ var (
errorPoolCountMeasure = stats.Int64("lb_placer_rp_error_count", "LB Placer RunnerPool RunnerList Error Count", "") errorPoolCountMeasure = stats.Int64("lb_placer_rp_error_count", "LB Placer RunnerPool RunnerList Error Count", "")
emptyPoolCountMeasure = stats.Int64("lb_placer_rp_empty_count", "LB Placer RunnerPool RunnerList Empty Count", "") emptyPoolCountMeasure = stats.Int64("lb_placer_rp_empty_count", "LB Placer RunnerPool RunnerList Empty Count", "")
cancelCountMeasure = stats.Int64("lb_placer_client_cancelled_count", "LB Placer Client Cancel Count", "") cancelCountMeasure = stats.Int64("lb_placer_client_cancelled_count", "LB Placer Client Cancel Count", "")
placerTimeoutMeasure = stats.Int64("lb_placer_timeout_count", "LB Placer Timeout Count", "")
placedErrorCountMeasure = stats.Int64("lb_placer_placed_error_count", "LB Placer Placed Call Count With Errors", "") placedErrorCountMeasure = stats.Int64("lb_placer_placed_error_count", "LB Placer Placed Call Count With Errors", "")
placedOKCountMeasure = stats.Int64("lb_placer_placed_ok_count", "LB Placer Placed Call Count Without Errors", "") placedOKCountMeasure = stats.Int64("lb_placer_placed_ok_count", "LB Placer Placed Call Count Without Errors", "")
retryTooBusyCountMeasure = stats.Int64("lb_placer_retry_busy_count", "LB Placer Retry Count - Too Busy", "") retryTooBusyCountMeasure = stats.Int64("lb_placer_retry_busy_count", "LB Placer Retry Count - Too Busy", "")
@@ -91,6 +92,7 @@ func RegisterPlacerViews(tagKeys []string) {
createView(errorPoolCountMeasure, view.Count(), tagKeys), createView(errorPoolCountMeasure, view.Count(), tagKeys),
createView(emptyPoolCountMeasure, view.Count(), tagKeys), createView(emptyPoolCountMeasure, view.Count(), tagKeys),
createView(cancelCountMeasure, view.Count(), tagKeys), createView(cancelCountMeasure, view.Count(), tagKeys),
createView(placerTimeoutMeasure, view.Count(), tagKeys),
createView(placedErrorCountMeasure, view.Count(), tagKeys), createView(placedErrorCountMeasure, view.Count(), tagKeys),
createView(placedOKCountMeasure, view.Count(), tagKeys), createView(placedOKCountMeasure, view.Count(), tagKeys),
createView(retryTooBusyCountMeasure, view.Count(), tagKeys), createView(retryTooBusyCountMeasure, view.Count(), tagKeys),

View File

@@ -0,0 +1,125 @@
package runnerpool
import (
"context"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"go.opencensus.io/stats"
)
type placerTracker struct {
cfg *PlacerConfig
requestCtx context.Context
placerCtx context.Context
cancel context.CancelFunc
tracker *attemptTracker
isPlaced bool
}
func NewPlacerTracker(requestCtx context.Context, cfg *PlacerConfig) *placerTracker {
ctx, cancel := context.WithTimeout(context.Background(), cfg.PlacerTimeout)
return &placerTracker{
cfg: cfg,
requestCtx: requestCtx,
placerCtx: ctx,
cancel: cancel,
tracker: newAttemptTracker(requestCtx),
}
}
// IsDone is a non-blocking check to see if the underlying deadlines are exceeded.
func (tr *placerTracker) IsDone() bool {
return tr.requestCtx.Err() != nil || tr.placerCtx.Err() != nil
}
// HandleFindRunnersFailure is a convenience function to record error from runnerpool.Runners()
func (tr *placerTracker) HandleFindRunnersFailure(err error) {
common.Logger(tr.requestCtx).WithError(err).Error("Failed to find runners for call")
stats.Record(tr.requestCtx, errorPoolCountMeasure.M(0))
}
// TryRunner is a convenience function to TryExec a call on a runner and
// analyze the results.
func (tr *placerTracker) TryRunner(r Runner, call RunnerCall) (bool, error) {
tr.tracker.recordAttempt()
// WARNING: Do not use placerCtx here to let requestCtx take its time
// during container execution.
ctx, cancel := context.WithCancel(tr.requestCtx)
isPlaced, err := r.TryExec(ctx, call)
cancel()
if !isPlaced {
// Too Busy is super common case, we track it separately
if err == models.ErrCallTimeoutServerBusy {
stats.Record(tr.requestCtx, retryTooBusyCountMeasure.M(0))
} else {
stats.Record(tr.requestCtx, retryErrorCountMeasure.M(0))
}
} else {
// Only log unusual (except for too-busy) errors for isPlaced (customer impacting) calls
if err != nil && err != models.ErrCallTimeoutServerBusy {
logger := common.Logger(ctx).WithField("runner_addr", r.Address())
logger.WithError(err).Errorf("Failed during call placement")
}
if err != nil {
stats.Record(tr.requestCtx, placedErrorCountMeasure.M(0))
} else {
stats.Record(tr.requestCtx, placedOKCountMeasure.M(0))
}
// Call is now committed. In other words, it was 'run'. We are done.
tr.isPlaced = true
}
return isPlaced, err
}
// HandleDone is cleanup function to cancel pending contexts and to
// record stats for the placement session.
func (tr *placerTracker) HandleDone() {
// Cancel Exit Path / Client cancelled/timedout
if tr.requestCtx.Err() != nil {
stats.Record(tr.requestCtx, cancelCountMeasure.M(0))
}
// This means our placer timed out. We ignore tr.isPlaced calls
// since we do not check/track placer ctx timeout if a call was
// actually ran on a runner. This means, placer timeout can be
// 10 secs, but a call can execute for 60 secs in a container.
if !tr.isPlaced && tr.placerCtx.Err() != nil {
stats.Record(tr.requestCtx, placerTimeoutMeasure.M(0))
}
tr.tracker.finalizeAttempts(tr.isPlaced)
tr.cancel()
}
// RetryAllBackoff blocks until it is time to try the runner list again. Returns
// false if the placer should stop trying.
func (tr *placerTracker) RetryAllBackoff(numOfRunners int) bool {
// This means Placer is operating on an empty list. No runners
// available. Record it.
if numOfRunners == 0 {
stats.Record(tr.requestCtx, emptyPoolCountMeasure.M(0))
}
select {
case <-tr.requestCtx.Done(): // client side timeout/cancel
return false
case <-tr.placerCtx.Done(): // placer wait timeout
return false
case <-time.After(tr.cfg.RetryAllDelay):
}
return true
}

View File

@@ -586,12 +586,13 @@ func WithAgentFromEnv() Option {
} }
// Select the placement algorithm // Select the placement algorithm
placerCfg := pool.NewPlacerConfig()
var placer pool.Placer var placer pool.Placer
switch getEnv(EnvLBPlacementAlg, "") { switch getEnv(EnvLBPlacementAlg, "") {
case "ch": case "ch":
placer = pool.NewCHPlacer() placer = pool.NewCHPlacer(&placerCfg)
default: default:
placer = pool.NewNaivePlacer() placer = pool.NewNaivePlacer(&placerCfg)
} }
keys := []string{"fn_appname", "fn_path"} keys := []string{"fn_appname", "fn_path"}

View File

@@ -217,7 +217,8 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
placer := pool.NewNaivePlacer() placerCfg := pool.NewPlacerConfig()
placer := pool.NewNaivePlacer(&placerCfg)
keys := []string{"fn_appname", "fn_path"} keys := []string{"fn_appname", "fn_path"}
pool.RegisterPlacerViews(keys) pool.RegisterPlacerViews(keys)