fn: LB ch and naive fixes (#942)

* fn: LB ch and naive fixes

*) Naive is now a naive RR algorithm.
*) Both now checks for ctx/timeout in each attempt.

* fn: test fix
This commit is contained in:
Tolga Ceylan
2018-05-07 11:50:16 -07:00
committed by Reed Allman
parent 878524ea6c
commit f0f9a6d945
3 changed files with 85 additions and 70 deletions

View File

@@ -171,11 +171,11 @@ func TestEnforceTimeoutFromContext(t *testing.T) {
}
}
func TestSpilloverToSecondRunner(t *testing.T) {
func TestRRRunner(t *testing.T) {
placer := pool.NewNaivePlacer()
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2)
parallelCalls := 3
parallelCalls := 2
var wg sync.WaitGroup
failures := make(chan error, parallelCalls)
for i := 0; i < parallelCalls; i++ {
@@ -194,8 +194,11 @@ func TestSpilloverToSecondRunner(t *testing.T) {
close(failures)
err := <-failures
if err != nil || rp.runners[1].(*mockRunner).procCalls != 1 {
t.Fatal("Expected spillover to second runner")
if err != nil {
t.Fatalf("Expected no error %s", err.Error())
}
if rp.runners[1].(*mockRunner).procCalls != 1 && rp.runners[0].(*mockRunner).procCalls != 1 {
t.Fatal("Expected rr runner")
}
}

View File

@@ -30,6 +30,12 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
timeout := time.After(call.LbDeadline().Sub(time.Now()))
for {
runners, err := rp.Runners(call)
if err != nil {
logrus.WithError(err).Error("Failed to find runners for call")
} else {
i := int(jumpConsistentHash(sum64, int32(len(runners))))
for j := 0; j < len(runners); j++ {
select {
case <-ctx.Done():
@@ -37,12 +43,8 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall
case <-timeout:
return models.ErrCallTimeoutServerBusy
default:
runners, err := rp.Runners(call)
if err != nil {
logrus.WithError(err).Error("Failed to find runners for call")
} else {
i := int(jumpConsistentHash(sum64, int32(len(runners))))
for j := 0; j < len(runners); j++ {
}
r := runners[i]
placed, err := r.TryExec(ctx, call)
@@ -71,7 +73,6 @@ func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall
case <-time.After(common.MinDuration(retryWaitInterval, remaining)):
}
}
}
}
// A Fast, Minimal Memory, Consistent Hash Algorithm:

View File

@@ -2,6 +2,7 @@ package runnerpool
import (
"context"
"sync/atomic"
"time"
"github.com/fnproject/fn/api/common"
@@ -15,28 +16,39 @@ const (
retryWaitInterval = 10 * time.Millisecond
)
type naivePlacer struct{}
type naivePlacer struct {
rrIndex uint64
}
func NewNaivePlacer() Placer {
logrus.Info("Creating new naive runnerpool placer")
return &naivePlacer{}
rrIndex := uint64(time.Now().Nanosecond())
logrus.Infof("Creating new naive runnerpool placer rrIndex=%d", rrIndex)
return &naivePlacer{
rrIndex: rrIndex,
}
}
func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
timeout := time.After(call.LbDeadline().Sub(time.Now()))
for {
runners, err := rp.Runners(call)
if err != nil {
logrus.WithError(err).Error("Failed to find runners for call")
} else {
for j := 0; j < len(runners); j++ {
select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
default:
runners, err := rp.Runners(call)
if err != nil {
logrus.WithError(err).Error("Failed to find runners for call")
} else {
for _, r := range runners {
}
i := atomic.AddUint64(&sp.rrIndex, uint64(1))
r := runners[int(i)%len(runners)]
placed, err := r.TryExec(ctx, call)
if err != nil {
logrus.WithError(err).Error("Failed during call placement")
@@ -61,5 +73,4 @@ func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call Runner
case <-time.After(common.MinDuration(retryWaitInterval, remaining)):
}
}
}
}