Files
fn-serverless/api/runnerpool/ch_placer.go
Tolga Ceylan f24172aa9d fn: introducing lb placer basic metrics (#1058)
* fn: introducing lb placer basic metrics

This change adds basic metrics to naive and consistent
hash LB placers. The stats show how many times we scanned
the full runner list, if runner pool failed to return a
runner list or if runner pool returned an empty list.

Placed and not placed status are also tracked along with
if TryExec returned an error or not. Most common error
code, Too-Busy is specifically tracked.

If client cancels/times out, this is also tracked as
a client cancel metric.

For placer latency, we would like to know how much time
the placer spent on searching for a runner until it
successfully places a call. This includes round-trip
times for NACK responses from the runners until a successful
TryExec() call. By excluding last successful TryExec() latency,
we try to exclude function execution & runner container
startup time from this metric in an attempt to isolate
Placer only latency.

* fn: latency and attempt tracker

Removing full scan metric. Tracking number of
runners attempted is a better metric for this
purpose.

Also, if rp.Runners() fail, this is an unrecoverable
error and we should bail out instead of retrying.

* fn: typo fix, ch placer finalize err return

* fn: enable LB placer metrics in WithAgentFromEnv if prometheus is enabled
2018-06-12 13:36:05 -07:00

118 lines
2.9 KiB
Go

/* The consistent hash ring from the original fnlb.
The behaviour of this depends on changes to the runner list leaving it relatively stable.
*/
package runnerpool
import (
"context"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/dchest/siphash"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
)
type chPlacer struct {
rrInterval time.Duration
}
func NewCHPlacer() Placer {
logrus.Info("Creating new CH runnerpool placer")
return &chPlacer{
rrInterval: 10 * time.Millisecond,
}
}
// This borrows the CH placement algorithm from the original FNLB.
// Because we ask a runner to accept load (queuing on the LB rather than on the nodes), we don't use
// the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it.
func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error {
tracker := newAttemptTracker(ctx)
log := common.Logger(ctx)
// The key is just the path in this case
key := call.Model().Path
sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key))
OutTries:
for {
runners, err := rp.Runners(call)
if err != nil {
log.WithError(err).Error("Failed to find runners for call")
stats.Record(ctx, errorPoolCountMeasure.M(0))
tracker.finalizeAttempts(false)
return err
}
i := int(jumpConsistentHash(sum64, int32(len(runners))))
for j := 0; j < len(runners); j++ {
if ctx.Err() != nil {
break OutTries
}
r := runners[i]
tracker.recordAttempt()
tryCtx, tryCancel := context.WithCancel(ctx)
placed, err := r.TryExec(tryCtx, call)
tryCancel()
// Only log unusual (except for too-busy) errors
if err != nil && err != models.ErrCallTimeoutServerBusy {
log.WithError(err).Errorf("Failed during call placement, placed=%v", placed)
}
if placed {
if err != nil {
stats.Record(ctx, placedErrorCountMeasure.M(0))
} else {
stats.Record(ctx, placedOKCountMeasure.M(0))
}
tracker.finalizeAttempts(true)
return err
}
i = (i + 1) % len(runners)
// Too Busy is super common case, we track it separately
if err == models.ErrCallTimeoutServerBusy {
stats.Record(ctx, retryTooBusyCountMeasure.M(0))
} else {
stats.Record(ctx, retryErrorCountMeasure.M(0))
}
}
if len(runners) == 0 {
stats.Record(ctx, emptyPoolCountMeasure.M(0))
}
// backoff
select {
case <-ctx.Done():
break OutTries
case <-time.After(p.rrInterval):
}
}
// Cancel Exit Path / Client cancelled/timedout
stats.Record(ctx, cancelCountMeasure.M(0))
tracker.finalizeAttempts(false)
return models.ErrCallTimeoutServerBusy
}
// A Fast, Minimal Memory, Consistent Hash Algorithm:
// https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf
func jumpConsistentHash(key uint64, num_buckets int32) int32 {
var b, j int64 = -1, 0
for j < int64(num_buckets) {
b = j
key = key*2862933555777941757 + 1
j = (b + 1) * int64((1<<31)/(key>>33)+1)
}
return int32(b)
}