From f0f9a6d945978766dbff621ff37057cda45f4172 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Mon, 7 May 2018 11:50:16 -0700 Subject: [PATCH] fn: LB ch and naive fixes (#942) * fn: LB ch and naive fixes *) Naive is now a naive RR algorithm. *) Both now checks for ctx/timeout in each attempt. * fn: test fix --- api/agent/lb_agent_test.go | 11 +++-- api/runnerpool/ch_placer.go | 69 ++++++++++++++++--------------- api/runnerpool/naive_placer.go | 75 +++++++++++++++++++--------------- 3 files changed, 85 insertions(+), 70 deletions(-) diff --git a/api/agent/lb_agent_test.go b/api/agent/lb_agent_test.go index c34382c5f..637e03bb2 100644 --- a/api/agent/lb_agent_test.go +++ b/api/agent/lb_agent_test.go @@ -171,11 +171,11 @@ func TestEnforceTimeoutFromContext(t *testing.T) { } } -func TestSpilloverToSecondRunner(t *testing.T) { +func TestRRRunner(t *testing.T) { placer := pool.NewNaivePlacer() rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2) - parallelCalls := 3 + parallelCalls := 2 var wg sync.WaitGroup failures := make(chan error, parallelCalls) for i := 0; i < parallelCalls; i++ { @@ -194,8 +194,11 @@ func TestSpilloverToSecondRunner(t *testing.T) { close(failures) err := <-failures - if err != nil || rp.runners[1].(*mockRunner).procCalls != 1 { - t.Fatal("Expected spillover to second runner") + if err != nil { + t.Fatalf("Expected no error %s", err.Error()) + } + if rp.runners[1].(*mockRunner).procCalls != 1 && rp.runners[0].(*mockRunner).procCalls != 1 { + t.Fatal("Expected rr runner") } } diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index 0ef031eae..e368dcb97 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -30,46 +30,47 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) timeout := time.After(call.LbDeadline().Sub(time.Now())) for { + runners, err := rp.Runners(call) + if err != nil { + logrus.WithError(err).Error("Failed to find runners for call") + } else { + i := int(jumpConsistentHash(sum64, int32(len(runners)))) + for j := 0; j < len(runners); j++ { + select { + case <-ctx.Done(): + return models.ErrCallTimeoutServerBusy + case <-timeout: + return models.ErrCallTimeoutServerBusy + default: + } + + r := runners[i] + + placed, err := r.TryExec(ctx, call) + if err != nil { + logrus.WithError(err).Error("Failed during call placement") + } + if placed { + return err + } + + i = (i + 1) % len(runners) + } + } + + remaining := call.LbDeadline().Sub(time.Now()) + if remaining <= 0 { + return models.ErrCallTimeoutServerBusy + } + + // backoff select { case <-ctx.Done(): return models.ErrCallTimeoutServerBusy case <-timeout: return models.ErrCallTimeoutServerBusy - default: - runners, err := rp.Runners(call) - if err != nil { - logrus.WithError(err).Error("Failed to find runners for call") - } else { - i := int(jumpConsistentHash(sum64, int32(len(runners)))) - for j := 0; j < len(runners); j++ { - r := runners[i] - - placed, err := r.TryExec(ctx, call) - if err != nil { - logrus.WithError(err).Error("Failed during call placement") - } - if placed { - return err - } - - i = (i + 1) % len(runners) - } - } - - remaining := call.LbDeadline().Sub(time.Now()) - if remaining <= 0 { - return models.ErrCallTimeoutServerBusy - } - - // backoff - select { - case <-ctx.Done(): - return models.ErrCallTimeoutServerBusy - case <-timeout: - return models.ErrCallTimeoutServerBusy - case <-time.After(common.MinDuration(retryWaitInterval, remaining)): - } + case <-time.After(common.MinDuration(retryWaitInterval, remaining)): } } } diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index 65ca543e0..9730f833e 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -2,6 +2,7 @@ package runnerpool import ( "context" + "sync/atomic" "time" "github.com/fnproject/fn/api/common" @@ -15,51 +16,61 @@ const ( retryWaitInterval = 10 * time.Millisecond ) -type naivePlacer struct{} +type naivePlacer struct { + rrIndex uint64 +} func NewNaivePlacer() Placer { - logrus.Info("Creating new naive runnerpool placer") - return &naivePlacer{} + rrIndex := uint64(time.Now().Nanosecond()) + logrus.Infof("Creating new naive runnerpool placer rrIndex=%d", rrIndex) + return &naivePlacer{ + rrIndex: rrIndex, + } } func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { timeout := time.After(call.LbDeadline().Sub(time.Now())) for { + runners, err := rp.Runners(call) + if err != nil { + logrus.WithError(err).Error("Failed to find runners for call") + } else { + for j := 0; j < len(runners); j++ { + + select { + case <-ctx.Done(): + return models.ErrCallTimeoutServerBusy + case <-timeout: + return models.ErrCallTimeoutServerBusy + default: + } + + i := atomic.AddUint64(&sp.rrIndex, uint64(1)) + r := runners[int(i)%len(runners)] + + placed, err := r.TryExec(ctx, call) + if err != nil { + logrus.WithError(err).Error("Failed during call placement") + } + if placed { + return err + } + } + } + + remaining := call.LbDeadline().Sub(time.Now()) + if remaining <= 0 { + return models.ErrCallTimeoutServerBusy + } + + // backoff select { case <-ctx.Done(): return models.ErrCallTimeoutServerBusy case <-timeout: return models.ErrCallTimeoutServerBusy - default: - runners, err := rp.Runners(call) - if err != nil { - logrus.WithError(err).Error("Failed to find runners for call") - } else { - for _, r := range runners { - placed, err := r.TryExec(ctx, call) - if err != nil { - logrus.WithError(err).Error("Failed during call placement") - } - if placed { - return err - } - } - } - - remaining := call.LbDeadline().Sub(time.Now()) - if remaining <= 0 { - return models.ErrCallTimeoutServerBusy - } - - // backoff - select { - case <-ctx.Done(): - return models.ErrCallTimeoutServerBusy - case <-timeout: - return models.ErrCallTimeoutServerBusy - case <-time.After(common.MinDuration(retryWaitInterval, remaining)): - } + case <-time.After(common.MinDuration(retryWaitInterval, remaining)): } } }