From 2e610a264a726522dab0365376bf7fc550fa61fe Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Mon, 1 Oct 2018 10:46:32 -0700 Subject: [PATCH] fn: remove async+sync seperation in resource tracker (#1254) This simplifies resource tracker. Originally, logically we had split the cpu/mem into two pools where a 20% was kept specifically for sync calls to avoid async calls dominating the system. However, resource tracker should not handle such call prioritization. Given the improvements to the evictor, I think we can get rid of this code in resource tracker for time being. --- api/agent/agent.go | 6 +- api/agent/call.go | 2 +- api/agent/resource.go | 180 ++++++++++-------------------- api/agent/resource_test.go | 219 +++++++------------------------------ 4 files changed, 98 insertions(+), 309 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 8a816b764..aafb3b298 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -441,7 +441,6 @@ func tryNotify(notifyChan chan error, err error) { func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan error) { curStats := call.slots.getStats() - isAsync := call.Type == models.TypeAsync isNB := a.cfg.EnableNBResourceTracker if !isNewContainerNeeded(&curStats) { return @@ -470,7 +469,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err // Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not // need to start a new container, then waiters will wait. select { - case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isAsync, isNB): + case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB): if tok != nil && tok.Error() != nil { // before returning error response, as a last resort, try evicting idle containers. if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) { @@ -551,7 +550,6 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { // launchCold waits for necessary resources to launch a new container, then // returns the slot for that new container to run the request on. func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { - isAsync := call.Type == models.TypeAsync isNB := a.cfg.EnableNBResourceTracker ch := make(chan Slot) @@ -564,7 +562,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { mem := call.Memory + uint64(call.TmpFsSize) select { - case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isAsync, isNB): + case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB): if tok.Error() != nil { return nil, tok.Error() } diff --git a/api/agent/call.go b/api/agent/call.go index f8990f61d..4c458d650 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -259,7 +259,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { } mem := c.Memory + uint64(c.TmpFsSize) - if !a.resources.IsResourcePossible(mem, c.CPUs, c.Type == models.TypeAsync) { + if !a.resources.IsResourcePossible(mem, c.CPUs) { return nil, models.ErrCallResourceTooBig } diff --git a/api/agent/resource.go b/api/agent/resource.go index 7ce8f96aa..dfa60e81d 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -41,7 +41,7 @@ type ResourceUtilization struct { } // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. -// TODO: add cpu, disk, network IO for future +// TODO: disk, network IO for future type ResourceTracker interface { // WaitAsyncResource returns a channel that will send once when there seem to be sufficient // resource levels to run an async task, it is up to the implementer to create policy here. @@ -52,14 +52,13 @@ type ResourceTracker interface { // will never receive anything (use IsResourcePossible). If a resource token is available for the provided // resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed. // if isNB is set, resource check is done and error token is returned without blocking. - // if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow - // a sync only reserve area) Memory is expected to be provided in MB units. - GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync, isNB bool) <-chan ResourceToken + // Memory is expected to be provided in MB units. + GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken // IsResourcePossible returns whether it's possible to fulfill the requested resources on this // machine. It must be called before GetResourceToken or GetResourceToken may hang. // Memory is expected to be provided in MB units. - IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool + IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool // Retrieve current stats/usage GetUtilization() ResourceUtilization @@ -68,26 +67,17 @@ type ResourceTracker interface { type resourceTracker struct { // cond protects access to ram variables below cond *sync.Cond - // ramTotal is the total usable memory for sync functions - ramSyncTotal uint64 - // ramSyncUsed is ram reserved for running sync containers including hot/idle - ramSyncUsed uint64 - // ramAsyncTotal is the total usable memory for async + sync functions - ramAsyncTotal uint64 - // ramAsyncUsed is ram reserved for running async + sync containers including hot/idle - ramAsyncUsed uint64 - // memory in use for async area in which agent stops dequeuing async jobs + // ramTotal is the total usable memory for functions + ramTotal uint64 + // ramUsed is ram reserved for running containers including hot/idle + ramUsed uint64 + // memory in use in which agent stops dequeuing async jobs ramAsyncHWMark uint64 - - // cpuTotal is the total usable cpu for sync functions - cpuSyncTotal uint64 - // cpuSyncUsed is cpu reserved for running sync containers including hot/idle - cpuSyncUsed uint64 - // cpuAsyncTotal is the total usable cpu for async + sync functions - cpuAsyncTotal uint64 - // cpuAsyncUsed is cpu reserved for running async + sync containers including hot/idle - cpuAsyncUsed uint64 - // cpu in use for async area in which agent stops dequeuing async jobs + // cpuTotal is the total usable cpu for functions + cpuTotal uint64 + // cpuUsed is cpu reserved for running containers including hot/idle + cpuUsed uint64 + // cpu in use in which agent stops dequeuing async jobs cpuAsyncHWMark uint64 } @@ -127,20 +117,12 @@ func (t *resourceToken) Close() error { return nil } -func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool { +func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota models.MilliCPUs) bool { - asyncAvailMem := a.ramAsyncTotal - a.ramAsyncUsed - syncAvailMem := a.ramSyncTotal - a.ramSyncUsed + availMem := a.ramTotal - a.ramUsed + availCPU := a.cpuTotal - a.cpuUsed - asyncAvailCPU := a.cpuAsyncTotal - a.cpuAsyncUsed - syncAvailCPU := a.cpuSyncTotal - a.cpuSyncUsed - - // For sync functions, we can steal from async pool. For async, we restrict it to sync pool - if isAsync { - return asyncAvailMem >= memory && asyncAvailCPU >= uint64(cpuQuota) - } else { - return asyncAvailMem+syncAvailMem >= memory && asyncAvailCPU+syncAvailCPU >= uint64(cpuQuota) - } + return availMem >= memory && availCPU >= uint64(cpuQuota) } func (a *resourceTracker) GetUtilization() ResourceUtilization { @@ -148,58 +130,33 @@ func (a *resourceTracker) GetUtilization() ResourceUtilization { a.cond.L.Lock() - util.CpuUsed = models.MilliCPUs(a.cpuAsyncUsed + a.cpuSyncUsed) - util.MemUsed = a.ramAsyncUsed + a.ramSyncUsed + util.CpuUsed = models.MilliCPUs(a.cpuUsed) + util.MemUsed = a.ramUsed a.cond.L.Unlock() - util.CpuAvail = models.MilliCPUs(a.cpuAsyncTotal+a.cpuSyncTotal) - util.CpuUsed - util.MemAvail = a.ramAsyncTotal + a.ramSyncTotal - util.MemUsed + util.CpuAvail = models.MilliCPUs(a.cpuTotal) - util.CpuUsed + util.MemAvail = a.ramTotal - util.MemUsed return util } // is this request possible to meet? If no, fail quick -func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool { +func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool { memory = memory * Mem1MB - - if isAsync { - return memory <= a.ramAsyncTotal && uint64(cpuQuota) <= a.cpuAsyncTotal - } else { - return memory <= a.ramSyncTotal+a.ramAsyncTotal && uint64(cpuQuota) <= a.cpuSyncTotal+a.cpuAsyncTotal - } + return memory <= a.ramTotal && uint64(cpuQuota) <= a.cpuTotal } -func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) ResourceToken { +func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.MilliCPUs) ResourceToken { - var asyncMem, syncMem uint64 - var asyncCPU, syncCPU uint64 - - if isAsync { - // async uses async pool only - asyncMem = memory - asyncCPU = uint64(cpuQuota) - } else { - // if sync fits async + sync pool - syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory) - syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, uint64(cpuQuota)) - - asyncMem = memory - syncMem - asyncCPU = uint64(cpuQuota) - syncCPU - } - - a.ramAsyncUsed += asyncMem - a.ramSyncUsed += syncMem - a.cpuAsyncUsed += asyncCPU - a.cpuSyncUsed += syncCPU + a.ramUsed += memory + a.cpuUsed += uint64(cpuQuota) return &resourceToken{decrement: func() { a.cond.L.Lock() - a.ramAsyncUsed -= asyncMem - a.ramSyncUsed -= syncMem - a.cpuAsyncUsed -= asyncCPU - a.cpuSyncUsed -= syncCPU + a.ramUsed -= memory + a.cpuUsed -= uint64(cpuQuota) a.cond.L.Unlock() // WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but @@ -209,8 +166,8 @@ func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.Mi }} } -func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) ResourceToken { - if !a.IsResourcePossible(memory, cpuQuota, isAsync) { +func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs) ResourceToken { + if !a.IsResourcePossible(memory, cpuQuota) { return &resourceToken{err: CapacityFull} } memory = memory * Mem1MB @@ -219,23 +176,23 @@ func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.Mill a.cond.L.Lock() - if !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) { + if !a.isResourceAvailableLocked(memory, cpuQuota) { t = &resourceToken{err: CapacityFull} } else { - t = a.allocResourcesLocked(memory, cpuQuota, isAsync) + t = a.allocResourcesLocked(memory, cpuQuota) } a.cond.L.Unlock() return t } -func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync bool) <-chan ResourceToken { +func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs) <-chan ResourceToken { ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan") ch := make(chan ResourceToken) go func() { defer span.End() - t := a.getResourceTokenNB(memory, cpuQuota, isAsync) + t := a.getResourceTokenNB(memory, cpuQuota) select { case ch <- t: @@ -250,14 +207,14 @@ func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uin // the received token should be passed directly to launch (unconditionally), launch // will close this token (i.e. the receiver should not call Close) -func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync, isNB bool) <-chan ResourceToken { +func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken { if isNB { - return a.getResourceTokenNBChan(ctx, memory, cpuQuota, isAsync) + return a.getResourceTokenNBChan(ctx, memory, cpuQuota) } ch := make(chan ResourceToken) - if !a.IsResourcePossible(memory, cpuQuota, isAsync) { + if !a.IsResourcePossible(memory, cpuQuota) { // return the channel, but never send anything. return ch } @@ -287,7 +244,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c c.L.Lock() isWaiting = true - for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { + for !a.isResourceAvailableLocked(memory, cpuQuota) && ctx.Err() == nil { c.Wait() } isWaiting = false @@ -297,7 +254,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c return } - t := a.allocResourcesLocked(memory, cpuQuota, isAsync) + t := a.allocResourcesLocked(memory, cpuQuota) c.L.Unlock() select { @@ -338,7 +295,7 @@ func (a *resourceTracker) WaitAsyncResource(ctx context.Context) chan struct{} { defer cancel() c.L.Lock() isWaiting = true - for (a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil { + for (a.ramUsed >= a.ramAsyncHWMark || a.cpuUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil { c.Wait() } isWaiting = false @@ -374,8 +331,6 @@ func clampUint64(val, min, max uint64) uint64 { func (a *resourceTracker) initializeCPU(cfg *Config) { - var maxSyncCPU, maxAsyncCPU, cpuAsyncHWMark uint64 - // Use all available CPU from go.runtime in non-linux systems. We ignore // non-linux container implementations and their limits on CPU if there's any. // (This is also the default if we cannot determine limits from proc or sysfs) @@ -417,36 +372,25 @@ func (a *resourceTracker) initializeCPU(cfg *Config) { "availCPU": availCPU, }).Info("available cpu") - // %20 of cpu for sync only reserve - maxSyncCPU = uint64(availCPU * 2 / 10) - maxAsyncCPU = availCPU - maxSyncCPU - cpuAsyncHWMark = maxAsyncCPU * 8 / 10 + a.cpuTotal = availCPU + a.cpuAsyncHWMark = availCPU * 8 / 10 logrus.WithFields(logrus.Fields{ - "cpuSync": maxSyncCPU, - "cpuAsync": maxAsyncCPU, - "cpuAsyncHWMark": cpuAsyncHWMark, - }).Info("sync and async cpu reservations") + "cpu": a.cpuTotal, + "cpuAsyncHWMark": a.cpuAsyncHWMark, + }).Info("cpu reservations") - if maxSyncCPU == 0 || maxAsyncCPU == 0 { + if a.cpuTotal == 0 { logrus.Fatal("Cannot get the proper CPU information to size server") } - if maxSyncCPU+maxAsyncCPU < 1000 { - logrus.Warn("Severaly Limited CPU: cpuSync + cpuAsync < 1000m (1 CPU)") - } else if maxAsyncCPU < 1000 { - logrus.Warn("Severaly Limited CPU: cpuAsync < 1000m (1 CPU)") + if a.cpuTotal < 1000 { + logrus.Warn("Severaly Limited CPU: cpu < 1000m (1 CPU)") } - - a.cpuAsyncHWMark = cpuAsyncHWMark - a.cpuSyncTotal = maxSyncCPU - a.cpuAsyncTotal = maxAsyncCPU } func (a *resourceTracker) initializeMemory(cfg *Config) { - var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64 - availMemory := uint64(DefaultNonLinuxMemory) if runtime.GOOS == "linux" { @@ -486,32 +430,22 @@ func (a *resourceTracker) initializeMemory(cfg *Config) { availMemory = minUint64(cfg.MaxTotalMemory, availMemory) } - // %20 of ram for sync only reserve - maxSyncMemory = uint64(availMemory * 2 / 10) - maxAsyncMemory = availMemory - maxSyncMemory - ramAsyncHWMark = maxAsyncMemory * 8 / 10 + a.ramTotal = availMemory + a.ramAsyncHWMark = availMemory * 8 / 10 // For non-linux OS, we expect these (or their defaults) properly configured from command-line/env logrus.WithFields(logrus.Fields{ - "availMemory": availMemory, - "ramSync": maxSyncMemory, - "ramAsync": maxAsyncMemory, - "ramAsyncHWMark": ramAsyncHWMark, - }).Info("sync and async ram reservations") + "availMemory": a.ramTotal, + "ramAsyncHWMark": a.ramAsyncHWMark, + }).Info("ram reservations") - if maxSyncMemory == 0 || maxAsyncMemory == 0 { + if a.ramTotal == 0 { logrus.Fatal("Cannot get the proper memory pool information to size server") } - if maxSyncMemory+maxAsyncMemory < 256*Mem1MB { - logrus.Warn("Severely Limited memory: ramSync + ramAsync < 256MB") - } else if maxAsyncMemory < 256*Mem1MB { - logrus.Warn("Severely Limited memory: ramAsync < 256MB") + if a.ramTotal < 256*Mem1MB { + logrus.Warn("Severely Limited memory: ram < 256MB") } - - a.ramAsyncHWMark = ramAsyncHWMark - a.ramSyncTotal = maxSyncMemory - a.ramAsyncTotal = maxAsyncMemory } // headroom estimation in order not to consume entire RAM if possible diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go index 4cd038610..e2d26d536 100644 --- a/api/agent/resource_test.go +++ b/api/agent/resource_test.go @@ -10,16 +10,12 @@ import ( func setTrackerTestVals(tr *resourceTracker, vals *trackerVals) { tr.cond.L.Lock() - tr.ramSyncTotal = vals.mst - tr.ramSyncUsed = vals.msu - tr.ramAsyncTotal = vals.mat - tr.ramAsyncUsed = vals.mau + tr.ramTotal = vals.mt + tr.ramUsed = vals.mu tr.ramAsyncHWMark = vals.mam - tr.cpuSyncTotal = vals.cst - tr.cpuSyncUsed = vals.csu - tr.cpuAsyncTotal = vals.cat - tr.cpuAsyncUsed = vals.cau + tr.cpuTotal = vals.ct + tr.cpuUsed = vals.cu tr.cpuAsyncHWMark = vals.cam tr.cond.L.Unlock() @@ -30,16 +26,12 @@ func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) { tr.cond.L.Lock() - vals.mst = tr.ramSyncTotal - vals.msu = tr.ramSyncUsed - vals.mat = tr.ramAsyncTotal - vals.mau = tr.ramAsyncUsed + vals.mt = tr.ramTotal + vals.mu = tr.ramUsed vals.mam = tr.ramAsyncHWMark - vals.cst = tr.cpuSyncTotal - vals.csu = tr.cpuSyncUsed - vals.cat = tr.cpuAsyncTotal - vals.cau = tr.cpuAsyncUsed + vals.ct = tr.cpuTotal + vals.cu = tr.cpuUsed vals.cam = tr.cpuAsyncHWMark tr.cond.L.Unlock() @@ -47,31 +39,23 @@ func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) { // helper to debug print (fields correspond to resourceTracker CPU/MEM fields) type trackerVals struct { - mst uint64 - msu uint64 - mat uint64 - mau uint64 + mt uint64 + mu uint64 mam uint64 - cst uint64 - csu uint64 - cat uint64 - cau uint64 + ct uint64 + cu uint64 cam uint64 } func (vals *trackerVals) setDefaults() { - // set set these to known vals (4GB total: 1GB sync, 3 async) - vals.mst = 1 * Mem1GB - vals.msu = 0 - vals.mat = 3 * Mem1GB - vals.mau = 0 + // set set these to known vals (4GB total: 1GB async hw mark) + vals.mt = 4 * Mem1GB + vals.mu = 0 vals.mam = 1 * Mem1GB - // let's assume 10 CPUs (2 CPU sync, 8 CPU async) - vals.cst = 2000 - vals.csu = 0 - vals.cat = 8000 - vals.cau = 0 + // let's assume 10 CPUs (6 CPU async hw mark) + vals.ct = 10000 + vals.cu = 0 vals.cam = 6000 } @@ -104,17 +88,17 @@ func TestResourceAsyncWait(t *testing.T) { tr := trI.(*resourceTracker) getTrackerTestVals(tr, &vals) - if vals.mst <= 0 || vals.msu != 0 || vals.mat <= 0 || vals.mau != 0 || vals.mam <= 0 { + if vals.mt <= 0 || vals.mu != 0 || vals.mam <= 0 { t.Fatalf("faulty init MEM %#v", vals) } - if vals.cst <= 0 || vals.csu != 0 || vals.cat <= 0 || vals.cau != 0 || vals.cam <= 0 { + if vals.ct <= 0 || vals.cu != 0 || vals.cam <= 0 { t.Fatalf("faulty init CPU %#v", vals) } vals.setDefaults() // should block & wait - vals.mau = vals.mam + vals.mu = vals.mam setTrackerTestVals(tr, &vals) ctx1, cancel1 := context.WithCancel(context.Background()) @@ -128,7 +112,7 @@ func TestResourceAsyncWait(t *testing.T) { } // should not block & wait - vals.mau = 0 + vals.mu = 0 setTrackerTestVals(tr, &vals) select { @@ -143,7 +127,7 @@ func TestResourceAsyncWait(t *testing.T) { defer cancel2() // should block & wait - vals.cau = vals.cam + vals.cu = vals.cam setTrackerTestVals(tr, &vals) select { @@ -153,7 +137,7 @@ func TestResourceAsyncWait(t *testing.T) { } // should not block & wait - vals.cau = 0 + vals.cu = 0 setTrackerTestVals(tr, &vals) select { @@ -172,14 +156,14 @@ func TestResourceGetSimple(t *testing.T) { vals.setDefaults() // let's make it like CPU and MEM are 100% full - vals.mau = vals.mat - vals.cau = vals.cat + vals.mu = vals.mt + vals.cu = vals.ct setTrackerTestVals(tr, &vals) // ask for 4GB and 10 CPU ctx, cancel := context.WithCancel(context.Background()) - ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, false) + ch := trI.GetResourceToken(ctx, 4*1024, 1000, false) defer cancel() _, err := fetchToken(ch) @@ -198,7 +182,7 @@ func TestResourceGetSimple(t *testing.T) { // ask for another 4GB and 10 CPU ctx, cancel = context.WithCancel(context.Background()) - ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, false) + ch = trI.GetResourceToken(ctx, 4*1024, 1000, false) defer cancel() _, err = fetchToken(ch) @@ -218,10 +202,10 @@ func TestResourceGetSimple(t *testing.T) { // POOLS should all be empty now getTrackerTestVals(tr, &vals) - if vals.msu != 0 || vals.mau != 0 { + if vals.mu != 0 { t.Fatalf("faulty state MEM %#v", vals) } - if vals.csu != 0 || vals.cau != 0 { + if vals.cu != 0 { t.Fatalf("faulty state CPU %#v", vals) } } @@ -235,14 +219,14 @@ func TestResourceGetSimpleNB(t *testing.T) { vals.setDefaults() // let's make it like CPU and MEM are 100% full - vals.mau = vals.mat - vals.cau = vals.cat + vals.mu = vals.mt + vals.cu = vals.ct setTrackerTestVals(tr, &vals) // ask for 4GB and 10 CPU ctx, cancel := context.WithCancel(context.Background()) - ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + ch := trI.GetResourceToken(ctx, 4*1024, 1000, true) defer cancel() tok := <-ch @@ -254,14 +238,14 @@ func TestResourceGetSimpleNB(t *testing.T) { vals.setDefaults() setTrackerTestVals(tr, &vals) - tok1 := <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + tok1 := <-trI.GetResourceToken(ctx, 4*1024, 1000, true) if tok1.Error() != nil { t.Fatalf("empty system should hand out token") } // ask for another 4GB and 10 CPU ctx, cancel = context.WithCancel(context.Background()) - ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + ch = trI.GetResourceToken(ctx, 4*1024, 1000, true) defer cancel() tok = <-ch @@ -272,7 +256,7 @@ func TestResourceGetSimpleNB(t *testing.T) { // close means, giant token resources released tok1.Close() - tok = <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + tok = <-trI.GetResourceToken(ctx, 4*1024, 1000, true) if tok.Error() != nil { t.Fatalf("empty system should hand out token") } @@ -281,137 +265,10 @@ func TestResourceGetSimpleNB(t *testing.T) { // POOLS should all be empty now getTrackerTestVals(tr, &vals) - if vals.msu != 0 || vals.mau != 0 { + if vals.mu != 0 { t.Fatalf("faulty state MEM %#v", vals) } - if vals.csu != 0 || vals.cau != 0 { + if vals.cu != 0 { t.Fatalf("faulty state CPU %#v", vals) } } - -func TestResourceGetCombo(t *testing.T) { - - var vals trackerVals - trI := NewResourceTracker(nil) - tr := trI.(*resourceTracker) - - vals.setDefaults() - setTrackerTestVals(tr, &vals) - - // impossible request - ctx, cancel := context.WithCancel(context.Background()) - ch := trI.GetResourceToken(ctx, 20*1024, 20000, false, false) - _, err := fetchToken(ch) - if err == nil { - t.Fatalf("impossible request should never return (error here)") - } - - cancel() - ctx, cancel = context.WithCancel(context.Background()) - - // let's use up 2 GB of 3GB async pool - ch = trI.GetResourceToken(ctx, 2*1024, 10, true, false) - tok1, err := fetchToken(ch) - if err != nil { - t.Fatalf("empty async system should hand out token1") - } - - cancel() - ctx, cancel = context.WithCancel(context.Background()) - - // remaining 1 GB async - ch = trI.GetResourceToken(ctx, 1*1024, 11, true, false) - tok2, err := fetchToken(ch) - if err != nil { - t.Fatalf("empty async system should hand out token2") - } - - cancel() - ctx, cancel = context.WithCancel(context.Background()) - - // NOW ASYNC POOL IS FULL - // SYNC POOL HAS 1GB - - // we no longer can get async token - ch = trI.GetResourceToken(ctx, 1*1024, 12, true, false) - _, err = fetchToken(ch) - if err == nil { - t.Fatalf("full async system should not hand out a token") - } - - cancel() - ctx, cancel = context.WithCancel(context.Background()) - - // but we should get 1GB sync token - ch = trI.GetResourceToken(ctx, 1*1024, 13, false, false) - tok3, err := fetchToken(ch) - if err != nil { - t.Fatalf("empty sync system should hand out token3") - } - - cancel() - ctx, cancel = context.WithCancel(context.Background()) - - // NOW ASYNC AND SYNC POOLS ARE FULL - - // this should fail - ch = trI.GetResourceToken(ctx, 1*1024, 14, false, false) - _, err = fetchToken(ch) - if err == nil { - t.Fatalf("full system should not hand out a token") - } - - cancel() - ctx, cancel = context.WithCancel(context.Background()) - - // now let's free up some async pool, release tok2 (1GB) - tok2.Close() - - // NOW ASYNC POOL HAS 1GB FREE - // SYNC POOL IS FULL - - // async pool should provide this - ch = trI.GetResourceToken(ctx, 1*1024, 15, false, false) - tok4, err := fetchToken(ch) - if err != nil { - t.Fatalf("async system should hand out token4") - } - - cancel() - ctx, cancel = context.WithCancel(context.Background()) - - // NOW ASYNC AND SYNC POOLS ARE FULL - - tok4.Close() - tok3.Close() - - // NOW ASYNC POOL HAS 1GB FREE - // SYNC POOL HAS 1GB FREE - - // now, we ask for 2GB sync token, it should be provided from both async+sync pools - ch = trI.GetResourceToken(ctx, 2*1024, 16, false, false) - tok5, err := fetchToken(ch) - if err != nil { - t.Fatalf("async+sync system should hand out token5") - } - - cancel() - - // NOW ASYNC AND SYNC POOLS ARE FULL - - tok1.Close() - tok5.Close() - - // attempt to close tok2 twice.. This should be OK. - tok2.Close() - - // POOLS should all be empty now - getTrackerTestVals(tr, &vals) - if vals.msu != 0 || vals.mau != 0 { - t.Fatalf("faulty state MEM %#v", vals) - } - if vals.csu != 0 || vals.cau != 0 { - t.Fatalf("faulty state CPU %#v", vals) - } - -}