Files
fn-serverless/api/runnerpool/ch_placer.go
Tolga Ceylan 4ccde8897e fn: lb and pure-runner with non-blocking agent (#989)
* fn: lb and pure-runner with non-blocking agent

*) Removed pure-runner capacity tracking code. This did
not play well with internal agent resource tracker.
*) In LB and runner gRPC comm, removed ACK. Now,
upon TryCall, pure-runner quickly proceeds to call
Submit. This is good since at this stage pure-runner
already has all relevant data to initiate the call.
*) Unless pure-runner emits a NACK, LB immediately
streams http body to runners.
*) For retriable requests added a CachedReader for
http.Request Body.
*) Idempotenty/retry is similar to previous code.
After initial success in Engament, after attempting
a TryCall, unless we receive NACK, we cannot retry
that call.
*) ch and naive places now wraps each TryExec with
a cancellable context to clean up gRPC contexts
quicker.

* fn: err for simpler one-time read GetBody approach

This allows for a more flexible approach since we let
users to define GetBody() to allow repetitive http body
read. In default LB case, LB executes a one-time io.ReadAll
and sets of GetBody, which is detected by RunnerCall.RequestBody().

* fn: additional check for non-nil req.body

* fn: attempt to override IO errors with ctx for TryExec

* fn: system-tests log dest

* fn: LB: EOF send handling

* fn: logging for partial IO

* fn: use buffer pool for IO storage in lb agent

* fn: pure runner should use chunks for data msgs

* fn: required config validations and pass APIErrors

* fn: additional tests and gRPC proto simplification

*) remove ACK/NACK messages as Finish message type works
OK for this purpose.
*) return resp in api tests for check for status code
*) empty body json test in api tests for lb & pure-runner

* fn: buffer adjustments

*) setRequestBody result handling correction
*) switch to bytes.Reader for read-only safety
*) io.EOF can be returned for non-nil Body in request.

* fn: clarify detection of 503 / Server Too Busy
2018-05-17 12:09:03 -07:00

92 lines
2.3 KiB
Go

/* The consistent hash ring from the original fnlb.
The behaviour of this depends on changes to the runner list leaving it relatively stable.
*/
package runnerpool
import (
"context"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/dchest/siphash"
"github.com/sirupsen/logrus"
)
type chPlacer struct{}
func NewCHPlacer() Placer {
logrus.Info("Creating new CH runnerpool placer")
return &chPlacer{}
}
// This borrows the CH placement algorithm from the original FNLB.
// Because we ask a runner to accept load (queuing on the LB rather than on the nodes), we don't use
// the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it.
func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
// The key is just the path in this case
key := call.Model().Path
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
timeout := time.After(call.LbDeadline().Sub(time.Now()))
for {
runners, err := rp.Runners(call)
if err != nil {
logrus.WithError(err).Error("Failed to find runners for call")
} else {
i := int(jumpConsistentHash(sum64, int32(len(runners))))
for j := 0; j < len(runners); j++ {
select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
default:
}
r := runners[i]
tryCtx, tryCancel := context.WithCancel(ctx)
placed, err := r.TryExec(tryCtx, call)
tryCancel()
if err != nil {
logrus.WithError(err).Error("Failed during call placement")
}
if placed {
return err
}
i = (i + 1) % len(runners)
}
}
remaining := call.LbDeadline().Sub(time.Now())
if remaining <= 0 {
return models.ErrCallTimeoutServerBusy
}
// backoff
select {
case <-ctx.Done():
return models.ErrCallTimeoutServerBusy
case <-timeout:
return models.ErrCallTimeoutServerBusy
case <-time.After(common.MinDuration(retryWaitInterval, remaining)):
}
}
}
// A Fast, Minimal Memory, Consistent Hash Algorithm:
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
var b, j int64 = -1, 0
for j < int64(num_buckets) {
b = j
key = key*2862933555777941757 + 1
j = (b + 1) * int64((1<<31)/(key>>33)+1)
}
return int32(b)
}