diff --git a/api/agent/agent.go b/api/agent/agent.go index c644abe9a..e39c0bc52 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -452,6 +452,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err mem := call.Memory + uint64(call.TmpFsSize) var notifyChans []chan struct{} + var tok ResourceToken // WARNING: Tricky flow below. We are here because: isNewContainerNeeded is true, // in other words, we need to launch a new container at this time due to high load. @@ -471,12 +472,28 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err // Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not // need to start a new container, then waiters will wait. select { - case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB): - if tok != nil && tok.Error() != nil { + case tok = <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB): + case <-time.After(a.cfg.HotPoll): + // Request routines are polling us with this a.cfg.HotPoll frequency. We can use this + // same timer to assume that we waited for cpu/mem long enough. Let's try to evict an + // idle container. We do this by submitting a non-blocking request and evicting required + // amount of resources. + select { + case tok = <-a.resources.GetResourceToken(ctx, mem, call.CPUs, true): + case <-ctx.Done(): // timeout + case <-a.shutWg.Closer(): // server shutdown + } + case <-ctx.Done(): // timeout + case <-a.shutWg.Closer(): // server shutdown + } + + if tok != nil { + if tok.Error() != nil { if tok.Error() != CapacityFull { tryNotify(notifyChan, tok.Error()) } else { - notifyChans = a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) + needMem, needCpu := tok.NeededCapacity() + notifyChans = a.evictor.PerformEviction(call.slotHashId, needMem, uint64(needCpu)) if len(notifyChans) == 0 { tryNotify(notifyChan, tok.Error()) } @@ -490,17 +507,8 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err // early return (do not allow container state to switch to ContainerStateDone) return } - if tok != nil { - statsUtilization(ctx, a.resources.GetUtilization()) - tok.Close() - } - // Request routines are polling us with this a.cfg.HotPoll frequency. We can use this - // same timer to assume that we waited for cpu/mem long enough. Let's try to evict an - // idle container. - case <-time.After(a.cfg.HotPoll): - notifyChans = a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) - case <-ctx.Done(): // timeout - case <-a.shutWg.Closer(): // server shutdown + statsUtilization(ctx, a.resources.GetUtilization()) + tok.Close() } defer state.UpdateState(ctx, ContainerStateDone, call.slots) diff --git a/api/agent/resource.go b/api/agent/resource.go index dfa60e81d..0825d5fb5 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -96,11 +96,14 @@ type ResourceToken interface { // Close must be called by any thread that receives a token. io.Closer Error() error + NeededCapacity() (uint64, models.MilliCPUs) } type resourceToken struct { once sync.Once err error + needCpu models.MilliCPUs + needMem uint64 decrement func() } @@ -108,6 +111,10 @@ func (t *resourceToken) Error() error { return t.err } +func (t *resourceToken) NeededCapacity() (uint64, models.MilliCPUs) { + return t.needMem, t.needCpu +} + func (t *resourceToken) Close() error { t.once.Do(func() { if t.decrement != nil { @@ -168,18 +175,29 @@ func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.Mi func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs) ResourceToken { if !a.IsResourcePossible(memory, cpuQuota) { - return &resourceToken{err: CapacityFull} + return &resourceToken{err: CapacityFull, needCpu: cpuQuota, needMem: memory} } memory = memory * Mem1MB var t ResourceToken + var needMem uint64 + var needCpu models.MilliCPUs a.cond.L.Lock() - if !a.isResourceAvailableLocked(memory, cpuQuota) { - t = &resourceToken{err: CapacityFull} - } else { + availMem := a.ramTotal - a.ramUsed + availCPU := a.cpuTotal - a.cpuUsed + + if availMem >= memory && availCPU >= uint64(cpuQuota) { t = a.allocResourcesLocked(memory, cpuQuota) + } else { + if availMem < memory { + needMem = (memory - availMem) / Mem1MB + } + if availCPU < uint64(cpuQuota) { + needCpu = models.MilliCPUs(uint64(cpuQuota) - availCPU) + } + t = &resourceToken{err: CapacityFull, needCpu: needCpu, needMem: needMem} } a.cond.L.Unlock()