fn: remove async+sync seperation in resource tracker (#1254)

This simplifies resource tracker. Originally, logically we had
split the cpu/mem into two pools where a 20%
was kept specifically for sync calls to avoid
async calls dominating the system. However, resource
tracker should not handle such call prioritization.
Given the improvements to the evictor, I think
we can get rid of this code in resource tracker
for time being.
This commit is contained in:
Tolga Ceylan
2018-10-01 10:46:32 -07:00
committed by GitHub
parent 927424092b
commit 2e610a264a
4 changed files with 98 additions and 309 deletions

View File

@@ -441,7 +441,6 @@ func tryNotify(notifyChan chan error, err error) {
func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan error) { func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan error) {
curStats := call.slots.getStats() curStats := call.slots.getStats()
isAsync := call.Type == models.TypeAsync
isNB := a.cfg.EnableNBResourceTracker isNB := a.cfg.EnableNBResourceTracker
if !isNewContainerNeeded(&curStats) { if !isNewContainerNeeded(&curStats) {
return return
@@ -470,7 +469,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
// Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not // Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not
// need to start a new container, then waiters will wait. // need to start a new container, then waiters will wait.
select { select {
case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isAsync, isNB): case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB):
if tok != nil && tok.Error() != nil { if tok != nil && tok.Error() != nil {
// before returning error response, as a last resort, try evicting idle containers. // before returning error response, as a last resort, try evicting idle containers.
if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) { if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) {
@@ -551,7 +550,6 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
// launchCold waits for necessary resources to launch a new container, then // launchCold waits for necessary resources to launch a new container, then
// returns the slot for that new container to run the request on. // returns the slot for that new container to run the request on.
func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
isAsync := call.Type == models.TypeAsync
isNB := a.cfg.EnableNBResourceTracker isNB := a.cfg.EnableNBResourceTracker
ch := make(chan Slot) ch := make(chan Slot)
@@ -564,7 +562,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
mem := call.Memory + uint64(call.TmpFsSize) mem := call.Memory + uint64(call.TmpFsSize)
select { select {
case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isAsync, isNB): case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB):
if tok.Error() != nil { if tok.Error() != nil {
return nil, tok.Error() return nil, tok.Error()
} }

View File

@@ -259,7 +259,7 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) {
} }
mem := c.Memory + uint64(c.TmpFsSize) mem := c.Memory + uint64(c.TmpFsSize)
if !a.resources.IsResourcePossible(mem, c.CPUs, c.Type == models.TypeAsync) { if !a.resources.IsResourcePossible(mem, c.CPUs) {
return nil, models.ErrCallResourceTooBig return nil, models.ErrCallResourceTooBig
} }

View File

@@ -41,7 +41,7 @@ type ResourceUtilization struct {
} }
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
// TODO: add cpu, disk, network IO for future // TODO: disk, network IO for future
type ResourceTracker interface { type ResourceTracker interface {
// WaitAsyncResource returns a channel that will send once when there seem to be sufficient // WaitAsyncResource returns a channel that will send once when there seem to be sufficient
// resource levels to run an async task, it is up to the implementer to create policy here. // resource levels to run an async task, it is up to the implementer to create policy here.
@@ -52,14 +52,13 @@ type ResourceTracker interface {
// will never receive anything (use IsResourcePossible). If a resource token is available for the provided // will never receive anything (use IsResourcePossible). If a resource token is available for the provided
// resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed. // resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed.
// if isNB is set, resource check is done and error token is returned without blocking. // if isNB is set, resource check is done and error token is returned without blocking.
// if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow // Memory is expected to be provided in MB units.
// a sync only reserve area) Memory is expected to be provided in MB units. GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken
GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync, isNB bool) <-chan ResourceToken
// IsResourcePossible returns whether it's possible to fulfill the requested resources on this // IsResourcePossible returns whether it's possible to fulfill the requested resources on this
// machine. It must be called before GetResourceToken or GetResourceToken may hang. // machine. It must be called before GetResourceToken or GetResourceToken may hang.
// Memory is expected to be provided in MB units. // Memory is expected to be provided in MB units.
IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool
// Retrieve current stats/usage // Retrieve current stats/usage
GetUtilization() ResourceUtilization GetUtilization() ResourceUtilization
@@ -68,26 +67,17 @@ type ResourceTracker interface {
type resourceTracker struct { type resourceTracker struct {
// cond protects access to ram variables below // cond protects access to ram variables below
cond *sync.Cond cond *sync.Cond
// ramTotal is the total usable memory for sync functions // ramTotal is the total usable memory for functions
ramSyncTotal uint64 ramTotal uint64
// ramSyncUsed is ram reserved for running sync containers including hot/idle // ramUsed is ram reserved for running containers including hot/idle
ramSyncUsed uint64 ramUsed uint64
// ramAsyncTotal is the total usable memory for async + sync functions // memory in use in which agent stops dequeuing async jobs
ramAsyncTotal uint64
// ramAsyncUsed is ram reserved for running async + sync containers including hot/idle
ramAsyncUsed uint64
// memory in use for async area in which agent stops dequeuing async jobs
ramAsyncHWMark uint64 ramAsyncHWMark uint64
// cpuTotal is the total usable cpu for functions
// cpuTotal is the total usable cpu for sync functions cpuTotal uint64
cpuSyncTotal uint64 // cpuUsed is cpu reserved for running containers including hot/idle
// cpuSyncUsed is cpu reserved for running sync containers including hot/idle cpuUsed uint64
cpuSyncUsed uint64 // cpu in use in which agent stops dequeuing async jobs
// cpuAsyncTotal is the total usable cpu for async + sync functions
cpuAsyncTotal uint64
// cpuAsyncUsed is cpu reserved for running async + sync containers including hot/idle
cpuAsyncUsed uint64
// cpu in use for async area in which agent stops dequeuing async jobs
cpuAsyncHWMark uint64 cpuAsyncHWMark uint64
} }
@@ -127,20 +117,12 @@ func (t *resourceToken) Close() error {
return nil return nil
} }
func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool { func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota models.MilliCPUs) bool {
asyncAvailMem := a.ramAsyncTotal - a.ramAsyncUsed availMem := a.ramTotal - a.ramUsed
syncAvailMem := a.ramSyncTotal - a.ramSyncUsed availCPU := a.cpuTotal - a.cpuUsed
asyncAvailCPU := a.cpuAsyncTotal - a.cpuAsyncUsed return availMem >= memory && availCPU >= uint64(cpuQuota)
syncAvailCPU := a.cpuSyncTotal - a.cpuSyncUsed
// For sync functions, we can steal from async pool. For async, we restrict it to sync pool
if isAsync {
return asyncAvailMem >= memory && asyncAvailCPU >= uint64(cpuQuota)
} else {
return asyncAvailMem+syncAvailMem >= memory && asyncAvailCPU+syncAvailCPU >= uint64(cpuQuota)
}
} }
func (a *resourceTracker) GetUtilization() ResourceUtilization { func (a *resourceTracker) GetUtilization() ResourceUtilization {
@@ -148,58 +130,33 @@ func (a *resourceTracker) GetUtilization() ResourceUtilization {
a.cond.L.Lock() a.cond.L.Lock()
util.CpuUsed = models.MilliCPUs(a.cpuAsyncUsed + a.cpuSyncUsed) util.CpuUsed = models.MilliCPUs(a.cpuUsed)
util.MemUsed = a.ramAsyncUsed + a.ramSyncUsed util.MemUsed = a.ramUsed
a.cond.L.Unlock() a.cond.L.Unlock()
util.CpuAvail = models.MilliCPUs(a.cpuAsyncTotal+a.cpuSyncTotal) - util.CpuUsed util.CpuAvail = models.MilliCPUs(a.cpuTotal) - util.CpuUsed
util.MemAvail = a.ramAsyncTotal + a.ramSyncTotal - util.MemUsed util.MemAvail = a.ramTotal - util.MemUsed
return util return util
} }
// is this request possible to meet? If no, fail quick // is this request possible to meet? If no, fail quick
func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) bool { func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool {
memory = memory * Mem1MB memory = memory * Mem1MB
return memory <= a.ramTotal && uint64(cpuQuota) <= a.cpuTotal
if isAsync {
return memory <= a.ramAsyncTotal && uint64(cpuQuota) <= a.cpuAsyncTotal
} else {
return memory <= a.ramSyncTotal+a.ramAsyncTotal && uint64(cpuQuota) <= a.cpuSyncTotal+a.cpuAsyncTotal
}
} }
func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) ResourceToken { func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.MilliCPUs) ResourceToken {
var asyncMem, syncMem uint64 a.ramUsed += memory
var asyncCPU, syncCPU uint64 a.cpuUsed += uint64(cpuQuota)
if isAsync {
// async uses async pool only
asyncMem = memory
asyncCPU = uint64(cpuQuota)
} else {
// if sync fits async + sync pool
syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory)
syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, uint64(cpuQuota))
asyncMem = memory - syncMem
asyncCPU = uint64(cpuQuota) - syncCPU
}
a.ramAsyncUsed += asyncMem
a.ramSyncUsed += syncMem
a.cpuAsyncUsed += asyncCPU
a.cpuSyncUsed += syncCPU
return &resourceToken{decrement: func() { return &resourceToken{decrement: func() {
a.cond.L.Lock() a.cond.L.Lock()
a.ramAsyncUsed -= asyncMem a.ramUsed -= memory
a.ramSyncUsed -= syncMem a.cpuUsed -= uint64(cpuQuota)
a.cpuAsyncUsed -= asyncCPU
a.cpuSyncUsed -= syncCPU
a.cond.L.Unlock() a.cond.L.Unlock()
// WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but // WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but
@@ -209,8 +166,8 @@ func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.Mi
}} }}
} }
func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs, isAsync bool) ResourceToken { func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs) ResourceToken {
if !a.IsResourcePossible(memory, cpuQuota, isAsync) { if !a.IsResourcePossible(memory, cpuQuota) {
return &resourceToken{err: CapacityFull} return &resourceToken{err: CapacityFull}
} }
memory = memory * Mem1MB memory = memory * Mem1MB
@@ -219,23 +176,23 @@ func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.Mill
a.cond.L.Lock() a.cond.L.Lock()
if !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) { if !a.isResourceAvailableLocked(memory, cpuQuota) {
t = &resourceToken{err: CapacityFull} t = &resourceToken{err: CapacityFull}
} else { } else {
t = a.allocResourcesLocked(memory, cpuQuota, isAsync) t = a.allocResourcesLocked(memory, cpuQuota)
} }
a.cond.L.Unlock() a.cond.L.Unlock()
return t return t
} }
func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync bool) <-chan ResourceToken { func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs) <-chan ResourceToken {
ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan") ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan")
ch := make(chan ResourceToken) ch := make(chan ResourceToken)
go func() { go func() {
defer span.End() defer span.End()
t := a.getResourceTokenNB(memory, cpuQuota, isAsync) t := a.getResourceTokenNB(memory, cpuQuota)
select { select {
case ch <- t: case ch <- t:
@@ -250,14 +207,14 @@ func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uin
// the received token should be passed directly to launch (unconditionally), launch // the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close) // will close this token (i.e. the receiver should not call Close)
func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isAsync, isNB bool) <-chan ResourceToken { func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken {
if isNB { if isNB {
return a.getResourceTokenNBChan(ctx, memory, cpuQuota, isAsync) return a.getResourceTokenNBChan(ctx, memory, cpuQuota)
} }
ch := make(chan ResourceToken) ch := make(chan ResourceToken)
if !a.IsResourcePossible(memory, cpuQuota, isAsync) { if !a.IsResourcePossible(memory, cpuQuota) {
// return the channel, but never send anything. // return the channel, but never send anything.
return ch return ch
} }
@@ -287,7 +244,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
c.L.Lock() c.L.Lock()
isWaiting = true isWaiting = true
for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { for !a.isResourceAvailableLocked(memory, cpuQuota) && ctx.Err() == nil {
c.Wait() c.Wait()
} }
isWaiting = false isWaiting = false
@@ -297,7 +254,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
return return
} }
t := a.allocResourcesLocked(memory, cpuQuota, isAsync) t := a.allocResourcesLocked(memory, cpuQuota)
c.L.Unlock() c.L.Unlock()
select { select {
@@ -338,7 +295,7 @@ func (a *resourceTracker) WaitAsyncResource(ctx context.Context) chan struct{} {
defer cancel() defer cancel()
c.L.Lock() c.L.Lock()
isWaiting = true isWaiting = true
for (a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil { for (a.ramUsed >= a.ramAsyncHWMark || a.cpuUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil {
c.Wait() c.Wait()
} }
isWaiting = false isWaiting = false
@@ -374,8 +331,6 @@ func clampUint64(val, min, max uint64) uint64 {
func (a *resourceTracker) initializeCPU(cfg *Config) { func (a *resourceTracker) initializeCPU(cfg *Config) {
var maxSyncCPU, maxAsyncCPU, cpuAsyncHWMark uint64
// Use all available CPU from go.runtime in non-linux systems. We ignore // Use all available CPU from go.runtime in non-linux systems. We ignore
// non-linux container implementations and their limits on CPU if there's any. // non-linux container implementations and their limits on CPU if there's any.
// (This is also the default if we cannot determine limits from proc or sysfs) // (This is also the default if we cannot determine limits from proc or sysfs)
@@ -417,36 +372,25 @@ func (a *resourceTracker) initializeCPU(cfg *Config) {
"availCPU": availCPU, "availCPU": availCPU,
}).Info("available cpu") }).Info("available cpu")
// %20 of cpu for sync only reserve a.cpuTotal = availCPU
maxSyncCPU = uint64(availCPU * 2 / 10) a.cpuAsyncHWMark = availCPU * 8 / 10
maxAsyncCPU = availCPU - maxSyncCPU
cpuAsyncHWMark = maxAsyncCPU * 8 / 10
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"cpuSync": maxSyncCPU, "cpu": a.cpuTotal,
"cpuAsync": maxAsyncCPU, "cpuAsyncHWMark": a.cpuAsyncHWMark,
"cpuAsyncHWMark": cpuAsyncHWMark, }).Info("cpu reservations")
}).Info("sync and async cpu reservations")
if maxSyncCPU == 0 || maxAsyncCPU == 0 { if a.cpuTotal == 0 {
logrus.Fatal("Cannot get the proper CPU information to size server") logrus.Fatal("Cannot get the proper CPU information to size server")
} }
if maxSyncCPU+maxAsyncCPU < 1000 { if a.cpuTotal < 1000 {
logrus.Warn("Severaly Limited CPU: cpuSync + cpuAsync < 1000m (1 CPU)") logrus.Warn("Severaly Limited CPU: cpu < 1000m (1 CPU)")
} else if maxAsyncCPU < 1000 {
logrus.Warn("Severaly Limited CPU: cpuAsync < 1000m (1 CPU)")
} }
a.cpuAsyncHWMark = cpuAsyncHWMark
a.cpuSyncTotal = maxSyncCPU
a.cpuAsyncTotal = maxAsyncCPU
} }
func (a *resourceTracker) initializeMemory(cfg *Config) { func (a *resourceTracker) initializeMemory(cfg *Config) {
var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64
availMemory := uint64(DefaultNonLinuxMemory) availMemory := uint64(DefaultNonLinuxMemory)
if runtime.GOOS == "linux" { if runtime.GOOS == "linux" {
@@ -486,32 +430,22 @@ func (a *resourceTracker) initializeMemory(cfg *Config) {
availMemory = minUint64(cfg.MaxTotalMemory, availMemory) availMemory = minUint64(cfg.MaxTotalMemory, availMemory)
} }
// %20 of ram for sync only reserve a.ramTotal = availMemory
maxSyncMemory = uint64(availMemory * 2 / 10) a.ramAsyncHWMark = availMemory * 8 / 10
maxAsyncMemory = availMemory - maxSyncMemory
ramAsyncHWMark = maxAsyncMemory * 8 / 10
// For non-linux OS, we expect these (or their defaults) properly configured from command-line/env // For non-linux OS, we expect these (or their defaults) properly configured from command-line/env
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"availMemory": availMemory, "availMemory": a.ramTotal,
"ramSync": maxSyncMemory, "ramAsyncHWMark": a.ramAsyncHWMark,
"ramAsync": maxAsyncMemory, }).Info("ram reservations")
"ramAsyncHWMark": ramAsyncHWMark,
}).Info("sync and async ram reservations")
if maxSyncMemory == 0 || maxAsyncMemory == 0 { if a.ramTotal == 0 {
logrus.Fatal("Cannot get the proper memory pool information to size server") logrus.Fatal("Cannot get the proper memory pool information to size server")
} }
if maxSyncMemory+maxAsyncMemory < 256*Mem1MB { if a.ramTotal < 256*Mem1MB {
logrus.Warn("Severely Limited memory: ramSync + ramAsync < 256MB") logrus.Warn("Severely Limited memory: ram < 256MB")
} else if maxAsyncMemory < 256*Mem1MB {
logrus.Warn("Severely Limited memory: ramAsync < 256MB")
} }
a.ramAsyncHWMark = ramAsyncHWMark
a.ramSyncTotal = maxSyncMemory
a.ramAsyncTotal = maxAsyncMemory
} }
// headroom estimation in order not to consume entire RAM if possible // headroom estimation in order not to consume entire RAM if possible

View File

@@ -10,16 +10,12 @@ import (
func setTrackerTestVals(tr *resourceTracker, vals *trackerVals) { func setTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
tr.cond.L.Lock() tr.cond.L.Lock()
tr.ramSyncTotal = vals.mst tr.ramTotal = vals.mt
tr.ramSyncUsed = vals.msu tr.ramUsed = vals.mu
tr.ramAsyncTotal = vals.mat
tr.ramAsyncUsed = vals.mau
tr.ramAsyncHWMark = vals.mam tr.ramAsyncHWMark = vals.mam
tr.cpuSyncTotal = vals.cst tr.cpuTotal = vals.ct
tr.cpuSyncUsed = vals.csu tr.cpuUsed = vals.cu
tr.cpuAsyncTotal = vals.cat
tr.cpuAsyncUsed = vals.cau
tr.cpuAsyncHWMark = vals.cam tr.cpuAsyncHWMark = vals.cam
tr.cond.L.Unlock() tr.cond.L.Unlock()
@@ -30,16 +26,12 @@ func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
tr.cond.L.Lock() tr.cond.L.Lock()
vals.mst = tr.ramSyncTotal vals.mt = tr.ramTotal
vals.msu = tr.ramSyncUsed vals.mu = tr.ramUsed
vals.mat = tr.ramAsyncTotal
vals.mau = tr.ramAsyncUsed
vals.mam = tr.ramAsyncHWMark vals.mam = tr.ramAsyncHWMark
vals.cst = tr.cpuSyncTotal vals.ct = tr.cpuTotal
vals.csu = tr.cpuSyncUsed vals.cu = tr.cpuUsed
vals.cat = tr.cpuAsyncTotal
vals.cau = tr.cpuAsyncUsed
vals.cam = tr.cpuAsyncHWMark vals.cam = tr.cpuAsyncHWMark
tr.cond.L.Unlock() tr.cond.L.Unlock()
@@ -47,31 +39,23 @@ func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
// helper to debug print (fields correspond to resourceTracker CPU/MEM fields) // helper to debug print (fields correspond to resourceTracker CPU/MEM fields)
type trackerVals struct { type trackerVals struct {
mst uint64 mt uint64
msu uint64 mu uint64
mat uint64
mau uint64
mam uint64 mam uint64
cst uint64 ct uint64
csu uint64 cu uint64
cat uint64
cau uint64
cam uint64 cam uint64
} }
func (vals *trackerVals) setDefaults() { func (vals *trackerVals) setDefaults() {
// set set these to known vals (4GB total: 1GB sync, 3 async) // set set these to known vals (4GB total: 1GB async hw mark)
vals.mst = 1 * Mem1GB vals.mt = 4 * Mem1GB
vals.msu = 0 vals.mu = 0
vals.mat = 3 * Mem1GB
vals.mau = 0
vals.mam = 1 * Mem1GB vals.mam = 1 * Mem1GB
// let's assume 10 CPUs (2 CPU sync, 8 CPU async) // let's assume 10 CPUs (6 CPU async hw mark)
vals.cst = 2000 vals.ct = 10000
vals.csu = 0 vals.cu = 0
vals.cat = 8000
vals.cau = 0
vals.cam = 6000 vals.cam = 6000
} }
@@ -104,17 +88,17 @@ func TestResourceAsyncWait(t *testing.T) {
tr := trI.(*resourceTracker) tr := trI.(*resourceTracker)
getTrackerTestVals(tr, &vals) getTrackerTestVals(tr, &vals)
if vals.mst <= 0 || vals.msu != 0 || vals.mat <= 0 || vals.mau != 0 || vals.mam <= 0 { if vals.mt <= 0 || vals.mu != 0 || vals.mam <= 0 {
t.Fatalf("faulty init MEM %#v", vals) t.Fatalf("faulty init MEM %#v", vals)
} }
if vals.cst <= 0 || vals.csu != 0 || vals.cat <= 0 || vals.cau != 0 || vals.cam <= 0 { if vals.ct <= 0 || vals.cu != 0 || vals.cam <= 0 {
t.Fatalf("faulty init CPU %#v", vals) t.Fatalf("faulty init CPU %#v", vals)
} }
vals.setDefaults() vals.setDefaults()
// should block & wait // should block & wait
vals.mau = vals.mam vals.mu = vals.mam
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
ctx1, cancel1 := context.WithCancel(context.Background()) ctx1, cancel1 := context.WithCancel(context.Background())
@@ -128,7 +112,7 @@ func TestResourceAsyncWait(t *testing.T) {
} }
// should not block & wait // should not block & wait
vals.mau = 0 vals.mu = 0
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
select { select {
@@ -143,7 +127,7 @@ func TestResourceAsyncWait(t *testing.T) {
defer cancel2() defer cancel2()
// should block & wait // should block & wait
vals.cau = vals.cam vals.cu = vals.cam
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
select { select {
@@ -153,7 +137,7 @@ func TestResourceAsyncWait(t *testing.T) {
} }
// should not block & wait // should not block & wait
vals.cau = 0 vals.cu = 0
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
select { select {
@@ -172,14 +156,14 @@ func TestResourceGetSimple(t *testing.T) {
vals.setDefaults() vals.setDefaults()
// let's make it like CPU and MEM are 100% full // let's make it like CPU and MEM are 100% full
vals.mau = vals.mat vals.mu = vals.mt
vals.cau = vals.cat vals.cu = vals.ct
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
// ask for 4GB and 10 CPU // ask for 4GB and 10 CPU
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, false) ch := trI.GetResourceToken(ctx, 4*1024, 1000, false)
defer cancel() defer cancel()
_, err := fetchToken(ch) _, err := fetchToken(ch)
@@ -198,7 +182,7 @@ func TestResourceGetSimple(t *testing.T) {
// ask for another 4GB and 10 CPU // ask for another 4GB and 10 CPU
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, false) ch = trI.GetResourceToken(ctx, 4*1024, 1000, false)
defer cancel() defer cancel()
_, err = fetchToken(ch) _, err = fetchToken(ch)
@@ -218,10 +202,10 @@ func TestResourceGetSimple(t *testing.T) {
// POOLS should all be empty now // POOLS should all be empty now
getTrackerTestVals(tr, &vals) getTrackerTestVals(tr, &vals)
if vals.msu != 0 || vals.mau != 0 { if vals.mu != 0 {
t.Fatalf("faulty state MEM %#v", vals) t.Fatalf("faulty state MEM %#v", vals)
} }
if vals.csu != 0 || vals.cau != 0 { if vals.cu != 0 {
t.Fatalf("faulty state CPU %#v", vals) t.Fatalf("faulty state CPU %#v", vals)
} }
} }
@@ -235,14 +219,14 @@ func TestResourceGetSimpleNB(t *testing.T) {
vals.setDefaults() vals.setDefaults()
// let's make it like CPU and MEM are 100% full // let's make it like CPU and MEM are 100% full
vals.mau = vals.mat vals.mu = vals.mt
vals.cau = vals.cat vals.cu = vals.ct
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
// ask for 4GB and 10 CPU // ask for 4GB and 10 CPU
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, true) ch := trI.GetResourceToken(ctx, 4*1024, 1000, true)
defer cancel() defer cancel()
tok := <-ch tok := <-ch
@@ -254,14 +238,14 @@ func TestResourceGetSimpleNB(t *testing.T) {
vals.setDefaults() vals.setDefaults()
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
tok1 := <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true) tok1 := <-trI.GetResourceToken(ctx, 4*1024, 1000, true)
if tok1.Error() != nil { if tok1.Error() != nil {
t.Fatalf("empty system should hand out token") t.Fatalf("empty system should hand out token")
} }
// ask for another 4GB and 10 CPU // ask for another 4GB and 10 CPU
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, true) ch = trI.GetResourceToken(ctx, 4*1024, 1000, true)
defer cancel() defer cancel()
tok = <-ch tok = <-ch
@@ -272,7 +256,7 @@ func TestResourceGetSimpleNB(t *testing.T) {
// close means, giant token resources released // close means, giant token resources released
tok1.Close() tok1.Close()
tok = <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true) tok = <-trI.GetResourceToken(ctx, 4*1024, 1000, true)
if tok.Error() != nil { if tok.Error() != nil {
t.Fatalf("empty system should hand out token") t.Fatalf("empty system should hand out token")
} }
@@ -281,137 +265,10 @@ func TestResourceGetSimpleNB(t *testing.T) {
// POOLS should all be empty now // POOLS should all be empty now
getTrackerTestVals(tr, &vals) getTrackerTestVals(tr, &vals)
if vals.msu != 0 || vals.mau != 0 { if vals.mu != 0 {
t.Fatalf("faulty state MEM %#v", vals) t.Fatalf("faulty state MEM %#v", vals)
} }
if vals.csu != 0 || vals.cau != 0 { if vals.cu != 0 {
t.Fatalf("faulty state CPU %#v", vals) t.Fatalf("faulty state CPU %#v", vals)
} }
} }
func TestResourceGetCombo(t *testing.T) {
var vals trackerVals
trI := NewResourceTracker(nil)
tr := trI.(*resourceTracker)
vals.setDefaults()
setTrackerTestVals(tr, &vals)
// impossible request
ctx, cancel := context.WithCancel(context.Background())
ch := trI.GetResourceToken(ctx, 20*1024, 20000, false, false)
_, err := fetchToken(ch)
if err == nil {
t.Fatalf("impossible request should never return (error here)")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// let's use up 2 GB of 3GB async pool
ch = trI.GetResourceToken(ctx, 2*1024, 10, true, false)
tok1, err := fetchToken(ch)
if err != nil {
t.Fatalf("empty async system should hand out token1")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// remaining 1 GB async
ch = trI.GetResourceToken(ctx, 1*1024, 11, true, false)
tok2, err := fetchToken(ch)
if err != nil {
t.Fatalf("empty async system should hand out token2")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// NOW ASYNC POOL IS FULL
// SYNC POOL HAS 1GB
// we no longer can get async token
ch = trI.GetResourceToken(ctx, 1*1024, 12, true, false)
_, err = fetchToken(ch)
if err == nil {
t.Fatalf("full async system should not hand out a token")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// but we should get 1GB sync token
ch = trI.GetResourceToken(ctx, 1*1024, 13, false, false)
tok3, err := fetchToken(ch)
if err != nil {
t.Fatalf("empty sync system should hand out token3")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// NOW ASYNC AND SYNC POOLS ARE FULL
// this should fail
ch = trI.GetResourceToken(ctx, 1*1024, 14, false, false)
_, err = fetchToken(ch)
if err == nil {
t.Fatalf("full system should not hand out a token")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// now let's free up some async pool, release tok2 (1GB)
tok2.Close()
// NOW ASYNC POOL HAS 1GB FREE
// SYNC POOL IS FULL
// async pool should provide this
ch = trI.GetResourceToken(ctx, 1*1024, 15, false, false)
tok4, err := fetchToken(ch)
if err != nil {
t.Fatalf("async system should hand out token4")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// NOW ASYNC AND SYNC POOLS ARE FULL
tok4.Close()
tok3.Close()
// NOW ASYNC POOL HAS 1GB FREE
// SYNC POOL HAS 1GB FREE
// now, we ask for 2GB sync token, it should be provided from both async+sync pools
ch = trI.GetResourceToken(ctx, 2*1024, 16, false, false)
tok5, err := fetchToken(ch)
if err != nil {
t.Fatalf("async+sync system should hand out token5")
}
cancel()
// NOW ASYNC AND SYNC POOLS ARE FULL
tok1.Close()
tok5.Close()
// attempt to close tok2 twice.. This should be OK.
tok2.Close()
// POOLS should all be empty now
getTrackerTestVals(tr, &vals)
if vals.msu != 0 || vals.mau != 0 {
t.Fatalf("faulty state MEM %#v", vals)
}
if vals.csu != 0 || vals.cau != 0 {
t.Fatalf("faulty state CPU %#v", vals)
}
}