fn: new agent resource tracker metrics (#1215)

New metrics for agent resource tracker: CpuUsed, CpuAvail,
MemUsed, MemAvail.
This commit is contained in:
Tolga Ceylan
2018-09-17 10:31:17 -07:00
committed by GitHub
parent dd727dfd12
commit b0c93dbd82
6 changed files with 109 additions and 38 deletions

View File

@@ -313,7 +313,7 @@ func (a *agent) submit(ctx context.Context, call *call) error {
func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isStarted bool) error { func (a *agent) handleCallEnd(ctx context.Context, call *call, slot Slot, err error, isStarted bool) error {
if slot != nil { if slot != nil {
slot.Close(common.BackgroundContext(ctx)) slot.Close()
} }
// This means call was routed (executed) // This means call was routed (executed)
@@ -466,7 +466,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
// Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not // Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not
// need to start a new container, then waiters will wait. // need to start a new container, then waiters will wait.
select { select {
case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isAsync, isNB):
if tok != nil && tok.Error() != nil { if tok != nil && tok.Error() != nil {
// before returning error response, as a last resort, try evicting idle containers. // before returning error response, as a last resort, try evicting idle containers.
if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) { if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) {
@@ -482,6 +482,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
return return
} }
if tok != nil { if tok != nil {
statsUtilization(ctx, a.resources.GetUtilization())
tok.Close() tok.Close()
} }
// Request routines are polling us with this a.cfg.HotPoll frequency. We can use this // Request routines are polling us with this a.cfg.HotPoll frequency. We can use this
@@ -519,7 +520,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
case s := <-ch: case s := <-ch:
if call.slots.acquireSlot(s) { if call.slots.acquireSlot(s) {
if s.slot.Error() != nil { if s.slot.Error() != nil {
s.slot.Close(ctx) s.slot.Close()
return nil, s.slot.Error() return nil, s.slot.Error()
} }
return s.slot, nil return s.slot, nil
@@ -559,7 +560,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
mem := call.Memory + uint64(call.TmpFsSize) mem := call.Memory + uint64(call.TmpFsSize)
select { select {
case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isAsync, isNB):
if tok.Error() != nil { if tok.Error() != nil {
return nil, tok.Error() return nil, tok.Error()
} }
@@ -573,7 +574,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
select { select {
case s := <-ch: case s := <-ch:
if s.Error() != nil { if s.Error() != nil {
s.Close(ctx) s.Close()
return nil, s.Error() return nil, s.Error()
} }
return s, nil return s, nil
@@ -586,6 +587,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
type coldSlot struct { type coldSlot struct {
cookie drivers.Cookie cookie drivers.Cookie
tok ResourceToken tok ResourceToken
closer func()
fatalErr error fatalErr error
} }
@@ -615,12 +617,10 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error {
return ctx.Err() return ctx.Err()
} }
func (s *coldSlot) Close(ctx context.Context) error { func (s *coldSlot) Close() error {
if s.cookie != nil { if s.closer != nil {
s.cookie.Close(ctx) s.closer()
} s.closer = nil
if s.tok != nil {
s.tok.Close()
} }
return nil return nil
} }
@@ -636,7 +636,7 @@ type hotSlot struct {
containerSpan trace.SpanContext containerSpan trace.SpanContext
} }
func (s *hotSlot) Close(ctx context.Context) error { func (s *hotSlot) Close() error {
close(s.done) close(s.done)
return nil return nil
} }
@@ -809,6 +809,7 @@ func (s *hotSlot) dispatchOldFormats(ctx context.Context, call *call) chan error
func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) { func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) {
ctx, span := trace.StartSpan(ctx, "agent_prep_cold") ctx, span := trace.StartSpan(ctx, "agent_prep_cold")
defer span.End() defer span.End()
statsUtilization(ctx, a.resources.GetUtilization())
call.containerState.UpdateState(ctx, ContainerStateStart, call.slots) call.containerState.UpdateState(ctx, ContainerStateStart, call.slots)
@@ -856,11 +857,21 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots) call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots)
slot := &coldSlot{cookie, tok, err} closer := func() {
if cookie != nil {
cookie.Close(ctx)
}
if tok != nil {
tok.Close()
}
statsUtilization(ctx, a.resources.GetUtilization())
}
slot := &coldSlot{cookie: cookie, tok: tok, closer: closer, fatalErr: err}
select { select {
case ch <- slot: case ch <- slot:
case <-ctx.Done(): case <-ctx.Done():
slot.Close(ctx) slot.Close()
} }
} }
@@ -870,6 +881,12 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
ctx = common.BackgroundContext(ctx) ctx = common.BackgroundContext(ctx)
ctx, span := trace.StartSpan(ctx, "agent_run_hot") ctx, span := trace.StartSpan(ctx, "agent_run_hot")
defer span.End() defer span.End()
statsUtilization(ctx, a.resources.GetUtilization())
defer func() {
statsUtilization(ctx, a.resources.GetUtilization())
}()
defer tok.Close() // IMPORTANT: this MUST get called defer tok.Close() // IMPORTANT: this MUST get called
state.UpdateState(ctx, ContainerStateStart, call.slots) state.UpdateState(ctx, ContainerStateStart, call.slots)
@@ -1154,7 +1171,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
// abort/shutdown/timeout, attempt to acquire and terminate, // abort/shutdown/timeout, attempt to acquire and terminate,
// otherwise continue processing the request // otherwise continue processing the request
if call.slots.acquireSlot(s) { if call.slots.acquireSlot(s) {
slot.Close(ctx) slot.Close()
if isEvictEvent { if isEvictEvent {
statsContainerEvicted(ctx) statsContainerEvicted(ctx)
} }

View File

@@ -319,7 +319,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
} }
mem := c.Memory + uint64(c.TmpFsSize) mem := c.Memory + uint64(c.TmpFsSize)
if !a.resources.IsResourcePossible(mem, uint64(c.CPUs), c.Type == models.TypeAsync) { if !a.resources.IsResourcePossible(mem, c.CPUs, c.Type == models.TypeAsync) {
return nil, models.ErrCallResourceTooBig return nil, models.ErrCallResourceTooBig
} }

View File

@@ -13,9 +13,10 @@ import (
"strings" "strings"
"sync" "sync"
"go.opencensus.io/trace" "github.com/fnproject/fn/api/models"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
) )
const ( const (
@@ -28,6 +29,17 @@ const (
var CapacityFull = errors.New("max capacity reached") var CapacityFull = errors.New("max capacity reached")
type ResourceUtilization struct {
// CPU in use
CpuUsed models.MilliCPUs
// CPU available
CpuAvail models.MilliCPUs
// Memory in use in bytes
MemUsed uint64
// Memory available in bytes
MemAvail uint64
}
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
// TODO: add cpu, disk, network IO for future // TODO: add cpu, disk, network IO for future
type ResourceTracker interface { type ResourceTracker interface {
@@ -42,12 +54,15 @@ type ResourceTracker interface {
// if isNB is set, resource check is done and error token is returned without blocking. // if isNB is set, resource check is done and error token is returned without blocking.
// if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow // if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow
// a sync only reserve area) Memory is expected to be provided in MB units. // a sync only reserve area) Memory is expected to be provided in MB units.
GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync, isNB bool) <-chan ResourceToken
// IsResourcePossible returns whether it's possible to fulfill the requested resources on this // IsResourcePossible returns whether it's possible to fulfill the requested resources on this
// machine. It must be called before GetResourceToken or GetResourceToken may hang. // machine. It must be called before GetResourceToken or GetResourceToken may hang.
// Memory is expected to be provided in MB units. // Memory is expected to be provided in MB units.
IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool
// Retrieve current stats/usage
GetUtilization() ResourceUtilization
} }
type resourceTracker struct { type resourceTracker struct {
@@ -105,12 +120,14 @@ func (t *resourceToken) Error() error {
func (t *resourceToken) Close() error { func (t *resourceToken) Close() error {
t.once.Do(func() { t.once.Do(func() {
t.decrement() if t.decrement != nil {
t.decrement()
}
}) })
return nil return nil
} }
func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota uint64, isAsync bool) bool { func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool {
asyncAvailMem := a.ramAsyncTotal - a.ramAsyncUsed asyncAvailMem := a.ramAsyncTotal - a.ramAsyncUsed
syncAvailMem := a.ramSyncTotal - a.ramSyncUsed syncAvailMem := a.ramSyncTotal - a.ramSyncUsed
@@ -120,24 +137,40 @@ func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota uint
// For sync functions, we can steal from async pool. For async, we restrict it to sync pool // For sync functions, we can steal from async pool. For async, we restrict it to sync pool
if isAsync { if isAsync {
return asyncAvailMem >= memory && asyncAvailCPU >= cpuQuota return asyncAvailMem >= memory && asyncAvailCPU >= uint64(cpuQuota)
} else { } else {
return asyncAvailMem+syncAvailMem >= memory && asyncAvailCPU+syncAvailCPU >= cpuQuota return asyncAvailMem+syncAvailMem >= memory && asyncAvailCPU+syncAvailCPU >= uint64(cpuQuota)
} }
} }
func (a *resourceTracker) GetUtilization() ResourceUtilization {
var util ResourceUtilization
a.cond.L.Lock()
util.CpuUsed = models.MilliCPUs(a.cpuAsyncUsed + a.cpuSyncUsed)
util.MemUsed = a.ramAsyncUsed + a.ramSyncUsed
a.cond.L.Unlock()
util.CpuAvail = models.MilliCPUs(a.cpuAsyncTotal+a.cpuSyncTotal) - util.CpuUsed
util.MemAvail = a.ramAsyncTotal + a.ramSyncTotal - util.MemUsed
return util
}
// is this request possible to meet? If no, fail quick // is this request possible to meet? If no, fail quick
func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota uint64, isAsync bool) bool { func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool {
memory = memory * Mem1MB memory = memory * Mem1MB
if isAsync { if isAsync {
return memory <= a.ramAsyncTotal && cpuQuota <= a.cpuAsyncTotal return memory <= a.ramAsyncTotal && uint64(cpuQuota) <= a.cpuAsyncTotal
} else { } else {
return memory <= a.ramSyncTotal+a.ramAsyncTotal && cpuQuota <= a.cpuSyncTotal+a.cpuAsyncTotal return memory <= a.ramSyncTotal+a.ramAsyncTotal && uint64(cpuQuota) <= a.cpuSyncTotal+a.cpuAsyncTotal
} }
} }
func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync bool) ResourceToken { func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) ResourceToken {
var asyncMem, syncMem uint64 var asyncMem, syncMem uint64
var asyncCPU, syncCPU uint64 var asyncCPU, syncCPU uint64
@@ -145,14 +178,14 @@ func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync
if isAsync { if isAsync {
// async uses async pool only // async uses async pool only
asyncMem = memory asyncMem = memory
asyncCPU = cpuQuota asyncCPU = uint64(cpuQuota)
} else { } else {
// if sync fits async + sync pool // if sync fits async + sync pool
syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory) syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory)
syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, cpuQuota) syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, uint64(cpuQuota))
asyncMem = memory - syncMem asyncMem = memory - syncMem
asyncCPU = cpuQuota - syncCPU asyncCPU = uint64(cpuQuota) - syncCPU
} }
a.ramAsyncUsed += asyncMem a.ramAsyncUsed += asyncMem
@@ -176,9 +209,9 @@ func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync
}} }}
} }
func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota uint64, isAsync bool) ResourceToken { func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) ResourceToken {
if !a.IsResourcePossible(memory, cpuQuota, isAsync) { if !a.IsResourcePossible(memory, cpuQuota, isAsync) {
return &resourceToken{decrement: func() {}, err: CapacityFull} return &resourceToken{err: CapacityFull}
} }
memory = memory * Mem1MB memory = memory * Mem1MB
@@ -187,7 +220,7 @@ func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota uint64, isA
a.cond.L.Lock() a.cond.L.Lock()
if !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) { if !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) {
t = &resourceToken{decrement: func() {}, err: CapacityFull} t = &resourceToken{err: CapacityFull}
} else { } else {
t = a.allocResourcesLocked(memory, cpuQuota, isAsync) t = a.allocResourcesLocked(memory, cpuQuota, isAsync)
} }
@@ -196,7 +229,7 @@ func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota uint64, isA
return t return t
} }
func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync bool) <-chan ResourceToken {
ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan") ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan")
ch := make(chan ResourceToken) ch := make(chan ResourceToken)
@@ -217,7 +250,7 @@ func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uin
// the received token should be passed directly to launch (unconditionally), launch // the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close) // will close this token (i.e. the receiver should not call Close)
func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken { func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync, isNB bool) <-chan ResourceToken {
if isNB { if isNB {
return a.getResourceTokenNBChan(ctx, memory, cpuQuota, isAsync) return a.getResourceTokenNBChan(ctx, memory, cpuQuota, isAsync)
} }

View File

@@ -20,7 +20,7 @@ import (
type Slot interface { type Slot interface {
exec(ctx context.Context, call *call) error exec(ctx context.Context, call *call) error
Close(ctx context.Context) error Close() error
Error() error Error() error
} }

View File

@@ -22,7 +22,7 @@ func (a *testSlot) exec(ctx context.Context, call *call) error {
return nil return nil
} }
func (a *testSlot) Close(ctx context.Context) error { func (a *testSlot) Close() error {
if a.isClosed { if a.isClosed {
panic(fmt.Errorf("id=%d already closed %v", a.id, a)) panic(fmt.Errorf("id=%d already closed %v", a.id, a))
} }
@@ -55,7 +55,7 @@ func checkGetTokenId(t *testing.T, a *slotQueue, dur time.Duration, id uint64) e
continue continue
} }
z.slot.Close(ctx) z.slot.Close()
if z.id != id { if z.id != id {
return fmt.Errorf("Bad slotToken received: %#v expected: %d", z, id) return fmt.Errorf("Bad slotToken received: %#v expected: %d", z, id)

View File

@@ -64,6 +64,13 @@ func statsContainerEvicted(ctx context.Context) {
stats.Record(ctx, containerEvictedMeasure.M(0)) stats.Record(ctx, containerEvictedMeasure.M(0))
} }
func statsUtilization(ctx context.Context, util ResourceUtilization) {
stats.Record(ctx, utilCpuUsedMeasure.M(int64(util.CpuUsed)))
stats.Record(ctx, utilCpuAvailMeasure.M(int64(util.CpuAvail)))
stats.Record(ctx, utilMemUsedMeasure.M(int64(util.MemUsed)))
stats.Record(ctx, utilMemAvailMeasure.M(int64(util.MemAvail)))
}
const ( const (
// //
// WARNING: Dual Role Metrics both used in Runner/Agent and LB-Agent // WARNING: Dual Role Metrics both used in Runner/Agent and LB-Agent
@@ -101,6 +108,11 @@ const (
containerEvictedMetricName = "container_evictions" containerEvictedMetricName = "container_evictions"
utilCpuUsedMetricName = "util_cpu_used"
utilCpuAvailMetricName = "util_cpu_avail"
utilMemUsedMetricName = "util_mem_used"
utilMemAvailMetricName = "util_mem_avail"
// Reported By LB // Reported By LB
runnerSchedLatencyMetricName = "lb_runner_sched_latency" runnerSchedLatencyMetricName = "lb_runner_sched_latency"
runnerExecLatencyMetricName = "lb_runner_exec_latency" runnerExecLatencyMetricName = "lb_runner_exec_latency"
@@ -119,6 +131,11 @@ var (
containerGaugeMeasures = initContainerGaugeMeasures() containerGaugeMeasures = initContainerGaugeMeasures()
containerTimeMeasures = initContainerTimeMeasures() containerTimeMeasures = initContainerTimeMeasures()
utilCpuUsedMeasure = common.MakeMeasure(utilCpuUsedMetricName, "agent cpu in use", "")
utilCpuAvailMeasure = common.MakeMeasure(utilCpuAvailMetricName, "agent cpu available", "")
utilMemUsedMeasure = common.MakeMeasure(utilMemUsedMetricName, "agent memory in use", "By")
utilMemAvailMeasure = common.MakeMeasure(utilMemAvailMetricName, "agent memory available", "By")
containerEvictedMeasure = common.MakeMeasure(containerEvictedMetricName, "containers evicted", "") containerEvictedMeasure = common.MakeMeasure(containerEvictedMetricName, "containers evicted", "")
// Reported By LB: How long does a runner scheduler wait for a committed call? eg. wait/launch/pull containers // Reported By LB: How long does a runner scheduler wait for a committed call? eg. wait/launch/pull containers
@@ -148,6 +165,10 @@ func RegisterAgentViews(tagKeys []string, latencyDist []float64) {
common.CreateView(timedoutMeasure, view.Sum(), tagKeys), common.CreateView(timedoutMeasure, view.Sum(), tagKeys),
common.CreateView(errorsMeasure, view.Sum(), tagKeys), common.CreateView(errorsMeasure, view.Sum(), tagKeys),
common.CreateView(serverBusyMeasure, view.Sum(), tagKeys), common.CreateView(serverBusyMeasure, view.Sum(), tagKeys),
common.CreateView(utilCpuUsedMeasure, view.LastValue(), tagKeys),
common.CreateView(utilCpuAvailMeasure, view.LastValue(), tagKeys),
common.CreateView(utilMemUsedMeasure, view.LastValue(), tagKeys),
common.CreateView(utilMemAvailMeasure, view.LastValue(), tagKeys),
) )
if err != nil { if err != nil {
logrus.WithError(err).Fatal("cannot register view") logrus.WithError(err).Fatal("cannot register view")