From b0c93dbd82922b37819bb01efea177f526783e55 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Mon, 17 Sep 2018 10:31:17 -0700 Subject: [PATCH] fn: new agent resource tracker metrics (#1215) New metrics for agent resource tracker: CpuUsed, CpuAvail, MemUsed, MemAvail. --- api/agent/agent.go | 47 ++++++++++++++++++--------- api/agent/call.go | 2 +- api/agent/resource.go | 71 ++++++++++++++++++++++++++++++----------- api/agent/slots.go | 2 +- api/agent/slots_test.go | 4 +-- api/agent/stats.go | 21 ++++++++++++ 6 files changed, 109 insertions(+), 38 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 7ffe9be6f..1d064d623 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -313,7 +313,7 @@ func (a *agent) submit(ctx context.Context, call *call) error { func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isStarted bool) error { if slot != nil { - slot.Close(common.BackgroundContext(ctx)) + slot.Close() } // This means call was routed (executed) @@ -466,7 +466,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err // Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not // need to start a new container, then waiters will wait. select { - case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): + case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isAsync, isNB): if tok != nil && tok.Error() != nil { // before returning error response, as a last resort, try evicting idle containers. if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) { @@ -482,6 +482,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err return } if tok != nil { + statsUtilization(ctx, a.resources.GetUtilization()) tok.Close() } // Request routines are polling us with this a.cfg.HotPoll frequency. We can use this @@ -519,7 +520,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { case s := <-ch: if call.slots.acquireSlot(s) { if s.slot.Error() != nil { - s.slot.Close(ctx) + s.slot.Close() return nil, s.slot.Error() } return s.slot, nil @@ -559,7 +560,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { mem := call.Memory + uint64(call.TmpFsSize) select { - case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): + case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isAsync, isNB): if tok.Error() != nil { return nil, tok.Error() } @@ -573,7 +574,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { select { case s := <-ch: if s.Error() != nil { - s.Close(ctx) + s.Close() return nil, s.Error() } return s, nil @@ -586,6 +587,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { type coldSlot struct { cookie drivers.Cookie tok ResourceToken + closer func() fatalErr error } @@ -615,12 +617,10 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error { return ctx.Err() } -func (s *coldSlot) Close(ctx context.Context) error { - if s.cookie != nil { - s.cookie.Close(ctx) - } - if s.tok != nil { - s.tok.Close() +func (s *coldSlot) Close() error { + if s.closer != nil { + s.closer() + s.closer = nil } return nil } @@ -636,7 +636,7 @@ type hotSlot struct { containerSpan trace.SpanContext } -func (s *hotSlot) Close(ctx context.Context) error { +func (s *hotSlot) Close() error { close(s.done) return nil } @@ -809,6 +809,7 @@ func (s *hotSlot) dispatchOldFormats(ctx context.Context, call *call) chan error func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) { ctx, span := trace.StartSpan(ctx, "agent_prep_cold") defer span.End() + statsUtilization(ctx, a.resources.GetUtilization()) call.containerState.UpdateState(ctx, ContainerStateStart, call.slots) @@ -856,11 +857,21 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots) - slot := &coldSlot{cookie, tok, err} + closer := func() { + if cookie != nil { + cookie.Close(ctx) + } + if tok != nil { + tok.Close() + } + statsUtilization(ctx, a.resources.GetUtilization()) + } + + slot := &coldSlot{cookie: cookie, tok: tok, closer: closer, fatalErr: err} select { case ch <- slot: case <-ctx.Done(): - slot.Close(ctx) + slot.Close() } } @@ -870,6 +881,12 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state ctx = common.BackgroundContext(ctx) ctx, span := trace.StartSpan(ctx, "agent_run_hot") defer span.End() + + statsUtilization(ctx, a.resources.GetUtilization()) + defer func() { + statsUtilization(ctx, a.resources.GetUtilization()) + }() + defer tok.Close() // IMPORTANT: this MUST get called state.UpdateState(ctx, ContainerStateStart, call.slots) @@ -1154,7 +1171,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, // abort/shutdown/timeout, attempt to acquire and terminate, // otherwise continue processing the request if call.slots.acquireSlot(s) { - slot.Close(ctx) + slot.Close() if isEvictEvent { statsContainerEvicted(ctx) } diff --git a/api/agent/call.go b/api/agent/call.go index 068505a8c..dde96ea39 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -319,7 +319,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { } mem := c.Memory + uint64(c.TmpFsSize) - if !a.resources.IsResourcePossible(mem, uint64(c.CPUs), c.Type == models.TypeAsync) { + if !a.resources.IsResourcePossible(mem, c.CPUs, c.Type == models.TypeAsync) { return nil, models.ErrCallResourceTooBig } diff --git a/api/agent/resource.go b/api/agent/resource.go index b4ba6793a..7ce8f96aa 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -13,9 +13,10 @@ import ( "strings" "sync" - "go.opencensus.io/trace" + "github.com/fnproject/fn/api/models" "github.com/sirupsen/logrus" + "go.opencensus.io/trace" ) const ( @@ -28,6 +29,17 @@ const ( var CapacityFull = errors.New("max capacity reached") +type ResourceUtilization struct { + // CPU in use + CpuUsed models.MilliCPUs + // CPU available + CpuAvail models.MilliCPUs + // Memory in use in bytes + MemUsed uint64 + // Memory available in bytes + MemAvail uint64 +} + // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // TODO: add cpu, disk, network IO for future type ResourceTracker interface { @@ -42,12 +54,15 @@ type ResourceTracker interface { // if isNB is set, resource check is done and error token is returned without blocking. // if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow // a sync only reserve area) Memory is expected to be provided in MB units. - GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken + GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync, isNB bool) <-chan ResourceToken // IsResourcePossible returns whether it's possible to fulfill the requested resources on this // machine. It must be called before GetResourceToken or GetResourceToken may hang. // Memory is expected to be provided in MB units. - IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool + IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool + + // Retrieve current stats/usage + GetUtilization() ResourceUtilization } type resourceTracker struct { @@ -105,12 +120,14 @@ func (t *resourceToken) Error() error { func (t *resourceToken) Close() error { t.once.Do(func() { - t.decrement() + if t.decrement != nil { + t.decrement() + } }) return nil } -func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota uint64, isAsync bool) bool { +func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool { asyncAvailMem := a.ramAsyncTotal - a.ramAsyncUsed syncAvailMem := a.ramSyncTotal - a.ramSyncUsed @@ -120,24 +137,40 @@ func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota uint // For sync functions, we can steal from async pool. For async, we restrict it to sync pool if isAsync { - return asyncAvailMem >= memory && asyncAvailCPU >= cpuQuota + return asyncAvailMem >= memory && asyncAvailCPU >= uint64(cpuQuota) } else { - return asyncAvailMem+syncAvailMem >= memory && asyncAvailCPU+syncAvailCPU >= cpuQuota + return asyncAvailMem+syncAvailMem >= memory && asyncAvailCPU+syncAvailCPU >= uint64(cpuQuota) } } +func (a *resourceTracker) GetUtilization() ResourceUtilization { + var util ResourceUtilization + + a.cond.L.Lock() + + util.CpuUsed = models.MilliCPUs(a.cpuAsyncUsed + a.cpuSyncUsed) + util.MemUsed = a.ramAsyncUsed + a.ramSyncUsed + + a.cond.L.Unlock() + + util.CpuAvail = models.MilliCPUs(a.cpuAsyncTotal+a.cpuSyncTotal) - util.CpuUsed + util.MemAvail = a.ramAsyncTotal + a.ramSyncTotal - util.MemUsed + + return util +} + // is this request possible to meet? If no, fail quick -func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota uint64, isAsync bool) bool { +func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool { memory = memory * Mem1MB if isAsync { - return memory <= a.ramAsyncTotal && cpuQuota <= a.cpuAsyncTotal + return memory <= a.ramAsyncTotal && uint64(cpuQuota) <= a.cpuAsyncTotal } else { - return memory <= a.ramSyncTotal+a.ramAsyncTotal && cpuQuota <= a.cpuSyncTotal+a.cpuAsyncTotal + return memory <= a.ramSyncTotal+a.ramAsyncTotal && uint64(cpuQuota) <= a.cpuSyncTotal+a.cpuAsyncTotal } } -func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync bool) ResourceToken { +func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) ResourceToken { var asyncMem, syncMem uint64 var asyncCPU, syncCPU uint64 @@ -145,14 +178,14 @@ func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync if isAsync { // async uses async pool only asyncMem = memory - asyncCPU = cpuQuota + asyncCPU = uint64(cpuQuota) } else { // if sync fits async + sync pool syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory) - syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, cpuQuota) + syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, uint64(cpuQuota)) asyncMem = memory - syncMem - asyncCPU = cpuQuota - syncCPU + asyncCPU = uint64(cpuQuota) - syncCPU } a.ramAsyncUsed += asyncMem @@ -176,9 +209,9 @@ func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync }} } -func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota uint64, isAsync bool) ResourceToken { +func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) ResourceToken { if !a.IsResourcePossible(memory, cpuQuota, isAsync) { - return &resourceToken{decrement: func() {}, err: CapacityFull} + return &resourceToken{err: CapacityFull} } memory = memory * Mem1MB @@ -187,7 +220,7 @@ func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota uint64, isA a.cond.L.Lock() if !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) { - t = &resourceToken{decrement: func() {}, err: CapacityFull} + t = &resourceToken{err: CapacityFull} } else { t = a.allocResourcesLocked(memory, cpuQuota, isAsync) } @@ -196,7 +229,7 @@ func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota uint64, isA return t } -func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { +func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync bool) <-chan ResourceToken { ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan") ch := make(chan ResourceToken) @@ -217,7 +250,7 @@ func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uin // the received token should be passed directly to launch (unconditionally), launch // will close this token (i.e. the receiver should not call Close) -func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken { +func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync, isNB bool) <-chan ResourceToken { if isNB { return a.getResourceTokenNBChan(ctx, memory, cpuQuota, isAsync) } diff --git a/api/agent/slots.go b/api/agent/slots.go index d782edc2d..f00ec2ace 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -20,7 +20,7 @@ import ( type Slot interface { exec(ctx context.Context, call *call) error - Close(ctx context.Context) error + Close() error Error() error } diff --git a/api/agent/slots_test.go b/api/agent/slots_test.go index b61c92fcd..4c6b66898 100644 --- a/api/agent/slots_test.go +++ b/api/agent/slots_test.go @@ -22,7 +22,7 @@ func (a *testSlot) exec(ctx context.Context, call *call) error { return nil } -func (a *testSlot) Close(ctx context.Context) error { +func (a *testSlot) Close() error { if a.isClosed { panic(fmt.Errorf("id=%d already closed %v", a.id, a)) } @@ -55,7 +55,7 @@ func checkGetTokenId(t *testing.T, a *slotQueue, dur time.Duration, id uint64) e continue } - z.slot.Close(ctx) + z.slot.Close() if z.id != id { return fmt.Errorf("Bad slotToken received: %#v expected: %d", z, id) diff --git a/api/agent/stats.go b/api/agent/stats.go index abee53101..afb890060 100644 --- a/api/agent/stats.go +++ b/api/agent/stats.go @@ -64,6 +64,13 @@ func statsContainerEvicted(ctx context.Context) { stats.Record(ctx, containerEvictedMeasure.M(0)) } +func statsUtilization(ctx context.Context, util ResourceUtilization) { + stats.Record(ctx, utilCpuUsedMeasure.M(int64(util.CpuUsed))) + stats.Record(ctx, utilCpuAvailMeasure.M(int64(util.CpuAvail))) + stats.Record(ctx, utilMemUsedMeasure.M(int64(util.MemUsed))) + stats.Record(ctx, utilMemAvailMeasure.M(int64(util.MemAvail))) +} + const ( // // WARNING: Dual Role Metrics both used in Runner/Agent and LB-Agent @@ -101,6 +108,11 @@ const ( containerEvictedMetricName = "container_evictions" + utilCpuUsedMetricName = "util_cpu_used" + utilCpuAvailMetricName = "util_cpu_avail" + utilMemUsedMetricName = "util_mem_used" + utilMemAvailMetricName = "util_mem_avail" + // Reported By LB runnerSchedLatencyMetricName = "lb_runner_sched_latency" runnerExecLatencyMetricName = "lb_runner_exec_latency" @@ -119,6 +131,11 @@ var ( containerGaugeMeasures = initContainerGaugeMeasures() containerTimeMeasures = initContainerTimeMeasures() + utilCpuUsedMeasure = common.MakeMeasure(utilCpuUsedMetricName, "agent cpu in use", "") + utilCpuAvailMeasure = common.MakeMeasure(utilCpuAvailMetricName, "agent cpu available", "") + utilMemUsedMeasure = common.MakeMeasure(utilMemUsedMetricName, "agent memory in use", "By") + utilMemAvailMeasure = common.MakeMeasure(utilMemAvailMetricName, "agent memory available", "By") + containerEvictedMeasure = common.MakeMeasure(containerEvictedMetricName, "containers evicted", "") // Reported By LB: How long does a runner scheduler wait for a committed call? eg. wait/launch/pull containers @@ -148,6 +165,10 @@ func RegisterAgentViews(tagKeys []string, latencyDist []float64) { common.CreateView(timedoutMeasure, view.Sum(), tagKeys), common.CreateView(errorsMeasure, view.Sum(), tagKeys), common.CreateView(serverBusyMeasure, view.Sum(), tagKeys), + common.CreateView(utilCpuUsedMeasure, view.LastValue(), tagKeys), + common.CreateView(utilCpuAvailMeasure, view.LastValue(), tagKeys), + common.CreateView(utilMemUsedMeasure, view.LastValue(), tagKeys), + common.CreateView(utilMemAvailMeasure, view.LastValue(), tagKeys), ) if err != nil { logrus.WithError(err).Fatal("cannot register view")