mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Move ch ring placement back from old FnLB. (#930)
* fn: bring back CH ring placer into FN repo based on original FnLB * fn: move placement code into runnerpool directory
This commit is contained in:
@@ -13,59 +13,6 @@ import (
|
||||
"github.com/fnproject/fn/fnext"
|
||||
)
|
||||
|
||||
type naivePlacer struct {
|
||||
}
|
||||
|
||||
func NewNaivePlacer() pool.Placer {
|
||||
return &naivePlacer{}
|
||||
}
|
||||
|
||||
func minDuration(f, s time.Duration) time.Duration {
|
||||
if f < s {
|
||||
return f
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (sp *naivePlacer) PlaceCall(rp pool.RunnerPool, ctx context.Context, call pool.RunnerCall) error {
|
||||
timeout := time.After(call.SlotDeadline().Sub(time.Now()))
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-timeout:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
default:
|
||||
runners, err := rp.Runners(call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to find runners for call")
|
||||
} else {
|
||||
for _, r := range runners {
|
||||
placed, err := r.TryExec(ctx, call)
|
||||
if placed {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
remaining := call.SlotDeadline().Sub(time.Now())
|
||||
if remaining <= 0 {
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
}
|
||||
|
||||
// backoff
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-timeout:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-time.After(minDuration(retryWaitInterval, remaining)):
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
runnerReconnectInterval = 5 * time.Second
|
||||
// sleep time to attempt placement across all runners before retrying
|
||||
|
||||
@@ -150,7 +150,7 @@ func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxC
|
||||
}
|
||||
|
||||
func TestOneRunner(t *testing.T) {
|
||||
placer := NewNaivePlacer()
|
||||
placer := pool.NewNaivePlacer()
|
||||
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
|
||||
call := &mockRunnerCall{slotDeadline: time.Now().Add(1 * time.Second)}
|
||||
err := placer.PlaceCall(rp, context.Background(), call)
|
||||
@@ -160,7 +160,7 @@ func TestOneRunner(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEnforceTimeoutFromContext(t *testing.T) {
|
||||
placer := NewNaivePlacer()
|
||||
placer := pool.NewNaivePlacer()
|
||||
rp := setupMockRunnerPool([]string{"171.19.0.1"}, 10*time.Millisecond, 5)
|
||||
call := &mockRunnerCall{slotDeadline: time.Now().Add(1 * time.Second)}
|
||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now())
|
||||
@@ -172,7 +172,7 @@ func TestEnforceTimeoutFromContext(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSpilloverToSecondRunner(t *testing.T) {
|
||||
placer := NewNaivePlacer()
|
||||
placer := pool.NewNaivePlacer()
|
||||
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2)
|
||||
|
||||
parallelCalls := 3
|
||||
@@ -200,7 +200,7 @@ func TestSpilloverToSecondRunner(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestEnforceSlotTimeout(t *testing.T) {
|
||||
placer := NewNaivePlacer()
|
||||
placer := pool.NewNaivePlacer()
|
||||
rp := setupMockRunnerPool([]string{"171.19.0.1", "171.19.0.2"}, 10*time.Millisecond, 2)
|
||||
|
||||
parallelCalls := 5
|
||||
|
||||
12
api/common/time_utils.go
Normal file
12
api/common/time_utils.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func MinDuration(f, s time.Duration) time.Duration {
|
||||
if f < s {
|
||||
return f
|
||||
}
|
||||
return s
|
||||
}
|
||||
87
api/runnerpool/ch_placer.go
Normal file
87
api/runnerpool/ch_placer.go
Normal file
@@ -0,0 +1,87 @@
|
||||
/* The consistent hash ring from the original fnlb.
|
||||
The behaviour of this depends on changes to the runner list leaving it relatively stable.
|
||||
*/
|
||||
package runnerpool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
|
||||
"github.com/dchest/siphash"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type chPlacer struct{}
|
||||
|
||||
func NewCHPlacer() Placer {
|
||||
logrus.Info("Creating new CH runnerpool placer")
|
||||
return &chPlacer{}
|
||||
}
|
||||
|
||||
// This borrows the CH placement algorithm from the original FNLB.
|
||||
// Because we ask a runner to accept load (queuing on the LB rather than on the nodes), we don't use
|
||||
// the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it.
|
||||
func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
|
||||
// The key is just the path in this case
|
||||
key := call.Model().Path
|
||||
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
|
||||
timeout := time.After(call.SlotDeadline().Sub(time.Now()))
|
||||
for {
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-timeout:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
default:
|
||||
runners, err := rp.Runners(call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to find runners for call")
|
||||
} else {
|
||||
i := int(jumpConsistentHash(sum64, int32(len(runners))))
|
||||
for j := 0; j < len(runners); j++ {
|
||||
r := runners[i]
|
||||
|
||||
placed, err := r.TryExec(ctx, call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed during call placement")
|
||||
}
|
||||
if placed {
|
||||
return err
|
||||
}
|
||||
|
||||
i = (i + 1) % len(runners)
|
||||
}
|
||||
}
|
||||
|
||||
remaining := call.SlotDeadline().Sub(time.Now())
|
||||
if remaining <= 0 {
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
}
|
||||
|
||||
// backoff
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-timeout:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-time.After(common.MinDuration(retryWaitInterval, remaining)):
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A Fast, Minimal Memory, Consistent Hash Algorithm:
|
||||
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
|
||||
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
|
||||
var b, j int64 = -1, 0
|
||||
for j < int64(num_buckets) {
|
||||
b = j
|
||||
key = key*2862933555777941757 + 1
|
||||
j = (b + 1) * int64((1<<31)/(key>>33)+1)
|
||||
}
|
||||
return int32(b)
|
||||
}
|
||||
65
api/runnerpool/naive_placer.go
Normal file
65
api/runnerpool/naive_placer.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package runnerpool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/common"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
// sleep time to attempt placement across all runners before retrying
|
||||
retryWaitInterval = 10 * time.Millisecond
|
||||
)
|
||||
|
||||
type naivePlacer struct{}
|
||||
|
||||
func NewNaivePlacer() Placer {
|
||||
logrus.Info("Creating new naive runnerpool placer")
|
||||
return &naivePlacer{}
|
||||
}
|
||||
|
||||
func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
|
||||
timeout := time.After(call.SlotDeadline().Sub(time.Now()))
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-timeout:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
default:
|
||||
runners, err := rp.Runners(call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed to find runners for call")
|
||||
} else {
|
||||
for _, r := range runners {
|
||||
placed, err := r.TryExec(ctx, call)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("Failed during call placement")
|
||||
}
|
||||
if placed {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
remaining := call.SlotDeadline().Sub(time.Now())
|
||||
if remaining <= 0 {
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
}
|
||||
|
||||
// backoff
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-timeout:
|
||||
return models.ErrCallTimeoutServerBusy
|
||||
case <-time.After(common.MinDuration(retryWaitInterval, remaining)):
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -63,6 +63,7 @@ const (
|
||||
EnvCertAuth = "FN_NODE_CERT_AUTHORITY"
|
||||
|
||||
EnvProcessCollectorList = "FN_PROCESS_COLLECTOR_LIST"
|
||||
EnvLBPlacementAlg = "FN_PLACER"
|
||||
|
||||
// Defaults
|
||||
DefaultLogLevel = "info"
|
||||
@@ -357,10 +358,6 @@ func (s *Server) defaultRunnerPool() (pool.RunnerPool, error) {
|
||||
return agent.DefaultStaticRunnerPool(strings.Split(runnerAddresses, ",")), nil
|
||||
}
|
||||
|
||||
func (s *Server) defaultPlacer() pool.Placer {
|
||||
return agent.NewNaivePlacer()
|
||||
}
|
||||
|
||||
func WithLogstoreFromDatastore() ServerOption {
|
||||
return func(ctx context.Context, s *Server) error {
|
||||
if s.datastore == nil {
|
||||
@@ -446,7 +443,15 @@ func WithAgentFromEnv() ServerOption {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
placer := s.defaultPlacer()
|
||||
|
||||
// Select the placement algorithm
|
||||
var placer pool.Placer
|
||||
switch getEnv(EnvLBPlacementAlg, "") {
|
||||
case "ch":
|
||||
placer = pool.NewCHPlacer()
|
||||
default:
|
||||
placer = pool.NewNaivePlacer()
|
||||
}
|
||||
|
||||
s.agent, err = agent.NewLBAgent(agent.NewCachedDataAccess(cl), runnerPool, placer)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user