fn: latency metrics for various call states (#1332)

* fn: latency metrics for various call states

This complements the API latency metrics available
on LB agent. In this case, we would like to measure
calls that have finished with the following status:
     "completed"
     "canceled"
     "timeouts"
     "errors"
     "server_busy"
and while measuring this latency, we subtract the
amount of time actual function execution took. This
is not precise, but an approximation mostly suitable
for trending.

Going forward, we could also subtract UDS wait time and/or
docker pull latency from this latency as an enhancement
to this PR.
This commit is contained in:
Tolga Ceylan
2018-11-30 13:20:59 -08:00
committed by GitHub
parent 88ba9e6215
commit df53250958
8 changed files with 116 additions and 11 deletions

View File

@@ -279,6 +279,9 @@ type call struct {
requestState RequestState
slotHashId string
// amount of time attributed to user-code execution
userExecTime *time.Duration
// LB & Pure Runner Extra Config
extensions map[string]string
}
@@ -311,6 +314,17 @@ func (c *call) StdErr() io.ReadWriteCloser {
return c.stderr
}
func (c *call) AddUserExecutionTime(dur time.Duration) {
if c.userExecTime == nil {
c.userExecTime = new(time.Duration)
}
*c.userExecTime += dur
}
func (c *call) GetUserExecutionTime() *time.Duration {
return c.userExecTime
}
func (c *call) Model() *models.Call { return c.Call }
func (c *call) Start(ctx context.Context) error {

View File

@@ -303,28 +303,68 @@ func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isFo
statsStopRun(ctx)
if err == nil {
statsComplete(ctx)
recordCallLatency(ctx, call, completedMetricName)
}
} else {
statsDequeue(ctx)
if err == context.DeadlineExceeded {
statsTooBusy(ctx)
recordCallLatency(ctx, call, serverBusyMetricName)
return models.ErrCallTimeoutServerBusy
}
}
if err == models.ErrCallTimeoutServerBusy {
statsTooBusy(ctx)
recordCallLatency(ctx, call, serverBusyMetricName)
return models.ErrCallTimeoutServerBusy
} else if err == context.DeadlineExceeded {
statsTimedout(ctx)
recordCallLatency(ctx, call, timedoutMetricName)
return models.ErrCallTimeout
} else if err == context.Canceled {
statsCanceled(ctx)
recordCallLatency(ctx, call, canceledMetricName)
} else if err != nil {
statsErrors(ctx)
recordCallLatency(ctx, call, errorsMetricName)
}
return err
}
func recordCallLatency(ctx context.Context, call *call, status string) {
// IMPORTANT: Why do we prefer 'StartedAt'? This is because we would like to
// exclude client transmission of the request body to the LB. We are trying to
// measure how long it took us to execute a user function and obtain its response.
// Notice how we cache client body *before* we call call.Start() where StartedAt
// is set. If call.Start() is not called yet, then we use call.CreatedAt.
var callLatency time.Duration
if !time.Time(call.StartedAt).IsZero() {
callLatency = time.Now().Sub(time.Time(call.StartedAt))
} else if !time.Time(call.CreatedAt).IsZero() {
callLatency = time.Now().Sub(time.Time(call.CreatedAt))
} else {
common.Logger(ctx).Error("cannot determine call start time")
return
}
// We want to exclude time spent in user-code. Today, this is container
// request processing latency as observed by runner agent.
execLatency := call.GetUserExecutionTime()
// some sanity check before. If sanity checks flags something, then
// this is likely that runners are sending malicious/suspicious data.
if execLatency != nil {
if *execLatency >= callLatency {
common.Logger(ctx).Errorf("invalid latency callLatency=%v execLatency=%v", callLatency, execLatency)
return
}
callLatency -= *execLatency
}
statsCallLatency(ctx, callLatency, status)
}
var _ Agent = &lbAgent{}
var _ callTrigger = &lbAgent{}

View File

@@ -125,6 +125,9 @@ type mockRunnerCall struct {
stdErr io.ReadWriteCloser
model *models.Call
slotHashId string
// amount of time user execution inside container
userExecTime *time.Duration
}
func (c *mockRunnerCall) SlotHashId() string {
@@ -151,6 +154,17 @@ func (c *mockRunnerCall) Model() *models.Call {
return c.model
}
func (c *mockRunnerCall) AddUserExecutionTime(dur time.Duration) {
if c.userExecTime == nil {
c.userExecTime = new(time.Duration)
}
*c.userExecTime += dur
}
func (c *mockRunnerCall) GetUserExecutionTime() *time.Duration {
return c.userExecTime
}
func setupMockRunnerPool(expectedRunners []string, execSleep time.Duration, maxCalls int32) *mockRunnerPool {
rf := NewMockRunnerFactory(execSleep, maxCalls)
return newMockRunnerPool(rf, expectedRunners)

View File

@@ -287,7 +287,7 @@ func translateDate(dt string) time.Time {
return time.Time{}
}
func recordFinishStats(ctx context.Context, msg *pb.CallFinished) {
func recordFinishStats(ctx context.Context, msg *pb.CallFinished, c pool.RunnerCall) {
creatTs := translateDate(msg.GetCreatedAt())
startTs := translateDate(msg.GetStartedAt())
@@ -295,8 +295,14 @@ func recordFinishStats(ctx context.Context, msg *pb.CallFinished) {
// Validate this as info *is* coming from runner and its local clock.
if !creatTs.IsZero() && !startTs.IsZero() && !complTs.IsZero() && !startTs.Before(creatTs) && !complTs.Before(startTs) {
statsLBAgentRunnerSchedLatency(ctx, startTs.Sub(creatTs))
statsLBAgentRunnerExecLatency(ctx, complTs.Sub(startTs))
runnerSchedLatency := startTs.Sub(creatTs)
runnerExecLatency := complTs.Sub(startTs)
statsLBAgentRunnerSchedLatency(ctx, runnerSchedLatency)
statsLBAgentRunnerExecLatency(ctx, runnerExecLatency)
c.AddUserExecutionTime(runnerExecLatency)
}
}
@@ -355,7 +361,7 @@ DataLoop:
// Finish messages required for finish/finalize the processing.
case *pb.RunnerMsg_Finished:
logCallFinish(log, body, w.Header(), statusCode)
recordFinishStats(ctx, body.Finished)
recordFinishStats(ctx, body.Finished, c)
if !body.Finished.Success {
err := parseError(body.Finished)
tryQueueError(err, done)

View File

@@ -15,6 +15,7 @@ import (
var (
containerStateKey = common.MakeKey("container_state")
callStatusKey = common.MakeKey("call_status")
)
func statsCalls(ctx context.Context) {
@@ -83,6 +84,16 @@ func statsUtilization(ctx context.Context, util ResourceUtilization) {
stats.Record(ctx, utilMemAvailMeasure.M(int64(util.MemAvail)))
}
func statsCallLatency(ctx context.Context, dur time.Duration, callStatus string) {
ctx, err := tag.New(ctx,
tag.Upsert(callStatusKey, callStatus),
)
if err != nil {
logrus.Fatal(err)
}
stats.Record(ctx, callLatencyMeasure.M(int64(dur/time.Millisecond)))
}
const (
//
// WARNING: Dual Role Metrics both used in Runner/Agent and LB-Agent
@@ -128,6 +139,7 @@ const (
// Reported By LB
runnerSchedLatencyMetricName = "lb_runner_sched_latency"
runnerExecLatencyMetricName = "lb_runner_exec_latency"
callLatencyMetricName = "lb_call_latency"
)
var (
@@ -154,12 +166,24 @@ var (
runnerSchedLatencyMeasure = common.MakeMeasure(runnerSchedLatencyMetricName, "Runner Scheduler Latency Reported By LBAgent", "msecs")
// Reported By LB: Function execution time inside a container.
runnerExecLatencyMeasure = common.MakeMeasure(runnerExecLatencyMetricName, "Runner Container Execution Latency Reported By LBAgent", "msecs")
// Reported By LB: Function total call latency (except function execution inside container)
callLatencyMeasure = common.MakeMeasure(callLatencyMetricName, "LB Call Latency Reported By LBAgent", "msecs")
)
func RegisterLBAgentViews(tagKeys []string, latencyDist []float64) {
// add call_status tag for call latency
callLatencyTags := make([]string, 0, len(tagKeys)+1)
callLatencyTags = append(callLatencyTags, "call_status")
for _, key := range tagKeys {
if key != "call_status" {
callLatencyTags = append(callLatencyTags, key)
}
}
err := view.Register(
common.CreateView(runnerSchedLatencyMeasure, view.Distribution(latencyDist...), tagKeys),
common.CreateView(runnerExecLatencyMeasure, view.Distribution(latencyDist...), tagKeys),
common.CreateView(callLatencyMeasure, view.Distribution(latencyDist...), callLatencyTags),
)
if err != nil {
logrus.WithError(err).Fatal("cannot register view")

View File

@@ -5,6 +5,7 @@ import (
"crypto/tls"
"io"
"net/http"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
@@ -61,4 +62,7 @@ type RunnerCall interface {
ResponseWriter() http.ResponseWriter
StdErr() io.ReadWriteCloser
Model() *models.Call
// For metrics/stats, add special accounting for time spent in customer code
AddUserExecutionTime(dur time.Duration)
GetUserExecutionTime() *time.Duration
}

View File

@@ -94,12 +94,14 @@ func TestCannotExecuteStatusImage(t *testing.T) {
type myCall struct{}
// implements RunnerCall
func (c *myCall) SlotHashId() string { return "" }
func (c *myCall) Extensions() map[string]string { return nil }
func (c *myCall) RequestBody() io.ReadCloser { return nil }
func (c *myCall) ResponseWriter() http.ResponseWriter { return nil }
func (c *myCall) StdErr() io.ReadWriteCloser { return nil }
func (c *myCall) Model() *models.Call { return nil }
func (c *myCall) SlotHashId() string { return "" }
func (c *myCall) Extensions() map[string]string { return nil }
func (c *myCall) RequestBody() io.ReadCloser { return nil }
func (c *myCall) ResponseWriter() http.ResponseWriter { return nil }
func (c *myCall) StdErr() io.ReadWriteCloser { return nil }
func (c *myCall) Model() *models.Call { return nil }
func (c *myCall) GetUserExecutionTime() *time.Duration { return nil }
func (c *myCall) AddUserExecutionTime(time.Duration) {}
func TestExecuteRunnerStatus(t *testing.T) {
buf := setLogBuffer()

View File

@@ -110,6 +110,7 @@ func setUpSystem() (*state, error) {
func downloadMetrics() {
time.Sleep(4 * time.Second)
fileName, ok := os.LookupEnv("SYSTEM_TEST_PROMETHEUS_FILE")
if !ok || fileName == "" {
return
@@ -225,7 +226,7 @@ func SetUpLBNode(ctx context.Context) (*server.Server, error) {
placerCfg := pool.NewPlacerConfig()
placer := pool.NewNaivePlacer(&placerCfg)
keys := []string{"fn_appname", "fn_path"}
keys := []string{}
dist := []float64{1, 10, 50, 100, 250, 500, 1000, 10000, 60000, 120000}
pool.RegisterPlacerViews(keys, dist)
agent.RegisterLBAgentViews(keys, dist)