mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: eviction resource correction (#1282)
Previously evictor did not perform an eviction if total cpu/mem of evictable containers was less than requested cpu/mem. With this change, we try to perform evictions based on actual needed cpu & mem reported by resource tracker.
This commit is contained in:
committed by
Dario Domizioli
parent
e8e6ed2fd0
commit
bf41789af2
@@ -452,6 +452,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
|||||||
mem := call.Memory + uint64(call.TmpFsSize)
|
mem := call.Memory + uint64(call.TmpFsSize)
|
||||||
|
|
||||||
var notifyChans []chan struct{}
|
var notifyChans []chan struct{}
|
||||||
|
var tok ResourceToken
|
||||||
|
|
||||||
// WARNING: Tricky flow below. We are here because: isNewContainerNeeded is true,
|
// WARNING: Tricky flow below. We are here because: isNewContainerNeeded is true,
|
||||||
// in other words, we need to launch a new container at this time due to high load.
|
// in other words, we need to launch a new container at this time due to high load.
|
||||||
@@ -471,12 +472,28 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
|||||||
// Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not
|
// Non-blocking mode only applies to cpu+mem, and if isNewContainerNeeded decided that we do not
|
||||||
// need to start a new container, then waiters will wait.
|
// need to start a new container, then waiters will wait.
|
||||||
select {
|
select {
|
||||||
case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB):
|
case tok = <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB):
|
||||||
if tok != nil && tok.Error() != nil {
|
case <-time.After(a.cfg.HotPoll):
|
||||||
|
// Request routines are polling us with this a.cfg.HotPoll frequency. We can use this
|
||||||
|
// same timer to assume that we waited for cpu/mem long enough. Let's try to evict an
|
||||||
|
// idle container. We do this by submitting a non-blocking request and evicting required
|
||||||
|
// amount of resources.
|
||||||
|
select {
|
||||||
|
case tok = <-a.resources.GetResourceToken(ctx, mem, call.CPUs, true):
|
||||||
|
case <-ctx.Done(): // timeout
|
||||||
|
case <-a.shutWg.Closer(): // server shutdown
|
||||||
|
}
|
||||||
|
case <-ctx.Done(): // timeout
|
||||||
|
case <-a.shutWg.Closer(): // server shutdown
|
||||||
|
}
|
||||||
|
|
||||||
|
if tok != nil {
|
||||||
|
if tok.Error() != nil {
|
||||||
if tok.Error() != CapacityFull {
|
if tok.Error() != CapacityFull {
|
||||||
tryNotify(notifyChan, tok.Error())
|
tryNotify(notifyChan, tok.Error())
|
||||||
} else {
|
} else {
|
||||||
notifyChans = a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs))
|
needMem, needCpu := tok.NeededCapacity()
|
||||||
|
notifyChans = a.evictor.PerformEviction(call.slotHashId, needMem, uint64(needCpu))
|
||||||
if len(notifyChans) == 0 {
|
if len(notifyChans) == 0 {
|
||||||
tryNotify(notifyChan, tok.Error())
|
tryNotify(notifyChan, tok.Error())
|
||||||
}
|
}
|
||||||
@@ -490,18 +507,9 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
|||||||
// early return (do not allow container state to switch to ContainerStateDone)
|
// early return (do not allow container state to switch to ContainerStateDone)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if tok != nil {
|
|
||||||
statsUtilization(ctx, a.resources.GetUtilization())
|
statsUtilization(ctx, a.resources.GetUtilization())
|
||||||
tok.Close()
|
tok.Close()
|
||||||
}
|
}
|
||||||
// Request routines are polling us with this a.cfg.HotPoll frequency. We can use this
|
|
||||||
// same timer to assume that we waited for cpu/mem long enough. Let's try to evict an
|
|
||||||
// idle container.
|
|
||||||
case <-time.After(a.cfg.HotPoll):
|
|
||||||
notifyChans = a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs))
|
|
||||||
case <-ctx.Done(): // timeout
|
|
||||||
case <-a.shutWg.Closer(): // server shutdown
|
|
||||||
}
|
|
||||||
|
|
||||||
defer state.UpdateState(ctx, ContainerStateDone, call.slots)
|
defer state.UpdateState(ctx, ContainerStateDone, call.slots)
|
||||||
|
|
||||||
|
|||||||
@@ -96,11 +96,14 @@ type ResourceToken interface {
|
|||||||
// Close must be called by any thread that receives a token.
|
// Close must be called by any thread that receives a token.
|
||||||
io.Closer
|
io.Closer
|
||||||
Error() error
|
Error() error
|
||||||
|
NeededCapacity() (uint64, models.MilliCPUs)
|
||||||
}
|
}
|
||||||
|
|
||||||
type resourceToken struct {
|
type resourceToken struct {
|
||||||
once sync.Once
|
once sync.Once
|
||||||
err error
|
err error
|
||||||
|
needCpu models.MilliCPUs
|
||||||
|
needMem uint64
|
||||||
decrement func()
|
decrement func()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,6 +111,10 @@ func (t *resourceToken) Error() error {
|
|||||||
return t.err
|
return t.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *resourceToken) NeededCapacity() (uint64, models.MilliCPUs) {
|
||||||
|
return t.needMem, t.needCpu
|
||||||
|
}
|
||||||
|
|
||||||
func (t *resourceToken) Close() error {
|
func (t *resourceToken) Close() error {
|
||||||
t.once.Do(func() {
|
t.once.Do(func() {
|
||||||
if t.decrement != nil {
|
if t.decrement != nil {
|
||||||
@@ -168,18 +175,29 @@ func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.Mi
|
|||||||
|
|
||||||
func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs) ResourceToken {
|
func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs) ResourceToken {
|
||||||
if !a.IsResourcePossible(memory, cpuQuota) {
|
if !a.IsResourcePossible(memory, cpuQuota) {
|
||||||
return &resourceToken{err: CapacityFull}
|
return &resourceToken{err: CapacityFull, needCpu: cpuQuota, needMem: memory}
|
||||||
}
|
}
|
||||||
memory = memory * Mem1MB
|
memory = memory * Mem1MB
|
||||||
|
|
||||||
var t ResourceToken
|
var t ResourceToken
|
||||||
|
var needMem uint64
|
||||||
|
var needCpu models.MilliCPUs
|
||||||
|
|
||||||
a.cond.L.Lock()
|
a.cond.L.Lock()
|
||||||
|
|
||||||
if !a.isResourceAvailableLocked(memory, cpuQuota) {
|
availMem := a.ramTotal - a.ramUsed
|
||||||
t = &resourceToken{err: CapacityFull}
|
availCPU := a.cpuTotal - a.cpuUsed
|
||||||
} else {
|
|
||||||
|
if availMem >= memory && availCPU >= uint64(cpuQuota) {
|
||||||
t = a.allocResourcesLocked(memory, cpuQuota)
|
t = a.allocResourcesLocked(memory, cpuQuota)
|
||||||
|
} else {
|
||||||
|
if availMem < memory {
|
||||||
|
needMem = (memory - availMem) / Mem1MB
|
||||||
|
}
|
||||||
|
if availCPU < uint64(cpuQuota) {
|
||||||
|
needCpu = models.MilliCPUs(uint64(cpuQuota) - availCPU)
|
||||||
|
}
|
||||||
|
t = &resourceToken{err: CapacityFull, needCpu: needCpu, needMem: needMem}
|
||||||
}
|
}
|
||||||
|
|
||||||
a.cond.L.Unlock()
|
a.cond.L.Unlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user