diff --git a/api/runnerpool/ch_placer.go b/api/runnerpool/ch_placer.go index 4f3476f3b..45fe91345 100644 --- a/api/runnerpool/ch_placer.go +++ b/api/runnerpool/ch_placer.go @@ -7,11 +7,12 @@ import ( "context" "time" + "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" "github.com/dchest/siphash" - "github.com/fnproject/fn/api/common" "github.com/sirupsen/logrus" + "go.opencensus.io/stats" ) type chPlacer struct { @@ -30,50 +31,77 @@ func NewCHPlacer() Placer { // the LB_WAIT to drive placement decisions: runners only accept work if they have the capacity for it. func (p *chPlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { + tracker := newAttemptTracker(ctx) log := common.Logger(ctx) // The key is just the path in this case key := call.Model().Path sum64 := siphash.Hash(0, 0x4c617279426f6174, []byte(key)) +OutTries: for { runners, err := rp.Runners(call) if err != nil { log.WithError(err).Error("Failed to find runners for call") - } else { - i := int(jumpConsistentHash(sum64, int32(len(runners)))) - for j := 0; j < len(runners); j++ { + stats.Record(ctx, errorPoolCountMeasure.M(0)) + tracker.finalizeAttempts(false) + return err + } - select { - case <-ctx.Done(): - return models.ErrCallTimeoutServerBusy - default: - } - - r := runners[i] - - tryCtx, tryCancel := context.WithCancel(ctx) - placed, err := r.TryExec(tryCtx, call) - tryCancel() - - if err != nil && err != models.ErrCallTimeoutServerBusy { - log.WithError(err).Error("Failed during call placement") - } - if placed { - return err - } - - i = (i + 1) % len(runners) + i := int(jumpConsistentHash(sum64, int32(len(runners)))) + for j := 0; j < len(runners); j++ { + if ctx.Err() != nil { + break OutTries } + + r := runners[i] + + tracker.recordAttempt() + tryCtx, tryCancel := context.WithCancel(ctx) + placed, err := r.TryExec(tryCtx, call) + tryCancel() + + // Only log unusual (except for too-busy) errors + if err != nil && err != models.ErrCallTimeoutServerBusy { + log.WithError(err).Errorf("Failed during call placement, placed=%v", placed) + } + + if placed { + if err != nil { + stats.Record(ctx, placedErrorCountMeasure.M(0)) + } else { + stats.Record(ctx, placedOKCountMeasure.M(0)) + } + tracker.finalizeAttempts(true) + return err + } + + i = (i + 1) % len(runners) + + // Too Busy is super common case, we track it separately + if err == models.ErrCallTimeoutServerBusy { + stats.Record(ctx, retryTooBusyCountMeasure.M(0)) + } else { + stats.Record(ctx, retryErrorCountMeasure.M(0)) + } + } + + if len(runners) == 0 { + stats.Record(ctx, emptyPoolCountMeasure.M(0)) } // backoff select { case <-ctx.Done(): - return models.ErrCallTimeoutServerBusy + break OutTries case <-time.After(p.rrInterval): } } + + // Cancel Exit Path / Client cancelled/timedout + stats.Record(ctx, cancelCountMeasure.M(0)) + tracker.finalizeAttempts(false) + return models.ErrCallTimeoutServerBusy } // A Fast, Minimal Memory, Consistent Hash Algorithm: diff --git a/api/runnerpool/naive_placer.go b/api/runnerpool/naive_placer.go index c0629005e..ae95d23f9 100644 --- a/api/runnerpool/naive_placer.go +++ b/api/runnerpool/naive_placer.go @@ -5,10 +5,11 @@ import ( "sync/atomic" "time" + "github.com/fnproject/fn/api/common" "github.com/fnproject/fn/api/models" - "github.com/fnproject/fn/api/common" "github.com/sirupsen/logrus" + "go.opencensus.io/stats" ) type naivePlacer struct { @@ -27,41 +28,69 @@ func NewNaivePlacer() Placer { func (sp *naivePlacer) PlaceCall(rp RunnerPool, ctx context.Context, call RunnerCall) error { + tracker := newAttemptTracker(ctx) log := common.Logger(ctx) + +OutTries: for { runners, err := rp.Runners(call) if err != nil { log.WithError(err).Error("Failed to find runners for call") - } else { - for j := 0; j < len(runners); j++ { + stats.Record(ctx, errorPoolCountMeasure.M(0)) + tracker.finalizeAttempts(false) + return err + } - select { - case <-ctx.Done(): - return models.ErrCallTimeoutServerBusy - default: - } - - i := atomic.AddUint64(&sp.rrIndex, uint64(1)) - r := runners[int(i)%len(runners)] - - tryCtx, tryCancel := context.WithCancel(ctx) - placed, err := r.TryExec(tryCtx, call) - tryCancel() - - if err != nil && err != models.ErrCallTimeoutServerBusy { - log.WithError(err).Error("Failed during call placement") - } - if placed { - return err - } + for j := 0; j < len(runners); j++ { + if ctx.Err() != nil { + break OutTries } + + i := atomic.AddUint64(&sp.rrIndex, uint64(1)) + r := runners[int(i)%len(runners)] + + tracker.recordAttempt() + tryCtx, tryCancel := context.WithCancel(ctx) + placed, err := r.TryExec(tryCtx, call) + tryCancel() + + // Only log unusual (except for too-busy) errors + if err != nil && err != models.ErrCallTimeoutServerBusy { + log.WithError(err).Errorf("Failed during call placement, placed=%v", placed) + } + + if placed { + if err != nil { + stats.Record(ctx, placedErrorCountMeasure.M(0)) + } else { + stats.Record(ctx, placedOKCountMeasure.M(0)) + } + tracker.finalizeAttempts(true) + return err + } + + // Too Busy is super common case, we track it separately + if err == models.ErrCallTimeoutServerBusy { + stats.Record(ctx, retryTooBusyCountMeasure.M(0)) + } else { + stats.Record(ctx, retryErrorCountMeasure.M(0)) + } + } + + if len(runners) == 0 { + stats.Record(ctx, emptyPoolCountMeasure.M(0)) } // backoff select { case <-ctx.Done(): - return models.ErrCallTimeoutServerBusy + break OutTries case <-time.After(sp.rrInterval): } } + + // Cancel Exit Path / Client cancelled/timedout + stats.Record(ctx, cancelCountMeasure.M(0)) + tracker.finalizeAttempts(false) + return models.ErrCallTimeoutServerBusy } diff --git a/api/runnerpool/placer_stats.go b/api/runnerpool/placer_stats.go new file mode 100644 index 000000000..3a841d407 --- /dev/null +++ b/api/runnerpool/placer_stats.go @@ -0,0 +1,103 @@ +package runnerpool + +import ( + "context" + "math" + "time" + + "github.com/sirupsen/logrus" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + attemptCountMeasure = stats.Int64("lb_placer_attempt_count", "LB Placer Number of Runners Attempted Count", "") + errorPoolCountMeasure = stats.Int64("lb_placer_rp_error_count", "LB Placer RunnerPool RunnerList Error Count", "") + emptyPoolCountMeasure = stats.Int64("lb_placer_rp_empty_count", "LB Placer RunnerPool RunnerList Empty Count", "") + cancelCountMeasure = stats.Int64("lb_placer_client_cancelled_count", "LB Placer Client Cancel Count", "") + placedErrorCountMeasure = stats.Int64("lb_placer_placed_error_count", "LB Placer Placed Call Count With Errors", "") + placedOKCountMeasure = stats.Int64("lb_placer_placed_ok_count", "LB Placer Placed Call Count Without Errors", "") + retryTooBusyCountMeasure = stats.Int64("lb_placer_retry_busy_count", "LB Placer Retry Count - Too Busy", "") + retryErrorCountMeasure = stats.Int64("lb_placer_retry_error_count", "LB Placer Retry Count - Errors", "") + placerLatencyMeasure = stats.Int64("lb_placer_latency", "LB Placer Latency", "msecs") +) + +// Helper struct for tracking LB Placer latency and attempt counts +type attemptTracker struct { + ctx context.Context + startTime time.Time + lastAttemptTime time.Time + attemptCount int64 +} + +func newAttemptTracker(ctx context.Context) *attemptTracker { + return &attemptTracker{ + ctx: ctx, + startTime: time.Now(), + } +} + +func (data *attemptTracker) finalizeAttempts(isSuccess bool) { + stats.Record(data.ctx, attemptCountMeasure.M(data.attemptCount)) + + // IMPORTANT: here we use (lastAttemptTime - startTime). We want to exclude TryExec + // latency *if* TryExec() goes through with success. Placer latency metric only shows + // how much time are spending in Placer loop/retries. The metric includes rtt/latency of + // *all* unsuccessful NACK (retriable) responses from runners as well. For example, if + // Placer loop here retries 4 runners (which takes 5 msecs each) and then 5th runner + // succeeds (but takes 35 seconds to finish execution), we report 20 msecs as our LB + // latency. + endTime := data.lastAttemptTime + if !isSuccess { + endTime = time.Now() + } + + stats.Record(data.ctx, placerLatencyMeasure.M(int64(endTime.Sub(data.startTime)/time.Millisecond))) +} + +func (data *attemptTracker) recordAttempt() { + data.lastAttemptTime = time.Now() + if data.attemptCount != math.MaxInt64 { + data.attemptCount++ + } +} + +func makeKeys(names []string) []tag.Key { + var tagKeys []tag.Key + for _, name := range names { + key, err := tag.NewKey(name) + if err != nil { + logrus.WithError(err).Fatal("cannot create tag key for %v", name) + } + tagKeys = append(tagKeys, key) + } + return tagKeys +} + +func createView(measure stats.Measure, agg *view.Aggregation, tagKeys []string) *view.View { + return &view.View{ + Name: measure.Name(), + Description: measure.Description(), + TagKeys: makeKeys(tagKeys), + Measure: measure, + Aggregation: agg, + } +} + +func RegisterPlacerViews(tagKeys []string) { + err := view.Register( + createView(attemptCountMeasure, view.Distribution(0, 1, 2, 4, 8, 32, 64, 256), tagKeys), + createView(errorPoolCountMeasure, view.Count(), tagKeys), + createView(emptyPoolCountMeasure, view.Count(), tagKeys), + createView(cancelCountMeasure, view.Count(), tagKeys), + createView(placedErrorCountMeasure, view.Count(), tagKeys), + createView(placedOKCountMeasure, view.Count(), tagKeys), + createView(retryTooBusyCountMeasure, view.Count(), tagKeys), + createView(retryErrorCountMeasure, view.Count(), tagKeys), + createView(placerLatencyMeasure, view.Distribution(1, 10, 25, 50, 200, 1000, 10000, 60000), tagKeys), + ) + if err != nil { + logrus.WithError(err).Fatal("cannot create view") + } +} diff --git a/api/runnerpool/runner_pool.go b/api/runnerpool/runner_pool.go index c5ecdb21d..961c76ffd 100644 --- a/api/runnerpool/runner_pool.go +++ b/api/runnerpool/runner_pool.go @@ -16,6 +16,7 @@ type Placer interface { // RunnerPool is the abstraction for getting an ordered list of runners to try for a call type RunnerPool interface { + // returns an error for unrecoverable errors that should not be retried Runners(call RunnerCall) ([]Runner, error) Shutdown(ctx context.Context) error } diff --git a/api/server/server.go b/api/server/server.go index 4ec983dfa..9291eb669 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -468,6 +468,12 @@ func WithAgentFromEnv() ServerOption { placer = pool.NewNaivePlacer() } + // If prometheus is enabled, add LB placer metrics to the views + if s.promExporter != nil { + keys := []string{"fn_appname", "fn_path"} + pool.RegisterPlacerViews(keys) + } + s.agent, err = agent.NewLBAgent(agent.NewCachedDataAccess(cl), runnerPool, placer) if err != nil { return errors.New("LBAgent creation failed") diff --git a/system_test.sh b/system_test.sh index 08370f9c7..2f5cb8991 100755 --- a/system_test.sh +++ b/system_test.sh @@ -10,7 +10,9 @@ function remove_system_containers { remove_system_containers -case "$1" in +DB_NAME=$1 + +case "$DB_NAME" in "sqlite3" ) rm -fr /tmp/fn_system_tests.db touch /tmp/fn_system_tests.db @@ -45,6 +47,11 @@ export FN_MAX_REQUEST_SIZE=6291456 export FN_MAX_RESPONSE_SIZE=6291456 export FN_ENABLE_NB_RESOURCE_TRACKER=1 +# +# dump prometheus metrics to this file +# +export SYSTEM_TEST_PROMETHEUS_FILE=./prometheus.${DB_NAME}.txt + cd test/fn-system-tests && FN_DB_URL=${FN_DB_URL} FN_API_URL=${FN_API_URL} go test -v -parallel ${2:-1} ./...; cd ../../ remove_system_containers diff --git a/test/fn-system-tests/.gitignore b/test/fn-system-tests/.gitignore new file mode 100644 index 000000000..2d3e3d4b0 --- /dev/null +++ b/test/fn-system-tests/.gitignore @@ -0,0 +1 @@ +prometheus.*.txt diff --git a/test/fn-system-tests/exec_test.go b/test/fn-system-tests/exec_test.go index 76a108f48..6f5d99508 100644 --- a/test/fn-system-tests/exec_test.go +++ b/test/fn-system-tests/exec_test.go @@ -17,16 +17,6 @@ import ( sdkmodels "github.com/fnproject/fn_go/models" ) -func LB() (string, error) { - lbURL := "http://127.0.0.1:8081" - - u, err := url.Parse(lbURL) - if err != nil { - return "", err - } - return u.Host, nil -} - func getEchoContent(respBytes []byte) (string, error) { var respJs map[string]interface{} diff --git a/test/fn-system-tests/system_test.go b/test/fn-system-tests/system_test.go index c89188400..9f079f8b9 100644 --- a/test/fn-system-tests/system_test.go +++ b/test/fn-system-tests/system_test.go @@ -13,8 +13,10 @@ import ( "github.com/sirupsen/logrus" + "io/ioutil" "net" "net/http" + "net/url" "os" "strconv" "strings" @@ -22,10 +24,22 @@ import ( "time" ) +const ( + LBAddress = "http://127.0.0.1:8081" +) + type SystemTestNodePool struct { runners []pool.Runner } +func LB() (string, error) { + u, err := url.Parse(LBAddress) + if err != nil { + return "", err + } + return u.Host, nil +} + func NewSystemTestNodePool() (pool.RunnerPool, error) { myAddr := whoAmI() runners := []string{ @@ -89,7 +103,33 @@ func SetUpSystem() (*state, error) { return state, nil } +func downloadMetrics() { + + fileName, ok := os.LookupEnv("SYSTEM_TEST_PROMETHEUS_FILE") + if !ok || fileName == "" { + return + } + + resp, err := http.Get(LBAddress + "/metrics") + if err != nil { + logrus.WithError(err).Fatal("Fetching metrics, got unexpected error") + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + logrus.WithError(err).Fatal("Reading metrics body, got unexpected error") + } + + err = ioutil.WriteFile(fileName, body, 0644) + if err != nil { + logrus.WithError(err).Fatalf("Writing metrics body to %v, got unexpected error", fileName) + } +} + func CleanUpSystem(st *state) error { + + downloadMetrics() + _, err := http.Get("http://127.0.0.1:8081/shutdown") if err != nil { return err @@ -157,6 +197,7 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { opts = append(opts, server.WithMQURL("")) opts = append(opts, server.WithLogURL("")) opts = append(opts, server.EnableShutdownEndpoint(ctx, func() {})) // TODO: do it properly + opts = append(opts, server.WithPrometheus()) apiURL := "http://127.0.0.1:8085" cl, err := hybrid.NewClient(apiURL) @@ -168,6 +209,10 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) { return nil, err } placer := pool.NewNaivePlacer() + + keys := []string{"fn_appname", "fn_path"} + pool.RegisterPlacerViews(keys) + agent, err := agent.NewLBAgent(agent.NewCachedDataAccess(cl), nodePool, placer) if err != nil { return nil, err