mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: adding hot launcher eviction waiting (#1257)
If checkLaunch triggers evictions, it must wait for these eviction to complete before returning. Premature returning from checkLaunch will cause checkLaunch to be called again by hot launcher. This causes checkLaunch to receive an out of capacity error and causes a 503. The evictor is also improved with this PR and it provides a slice of channels to wait on if evictions are taking place. Eviction token deletion is performed *after* resource token close to ensure that once an eviction is done, resource token is also free.
This commit is contained in:
@@ -451,6 +451,8 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
||||
|
||||
mem := call.Memory + uint64(call.TmpFsSize)
|
||||
|
||||
var notifyChans []chan struct{}
|
||||
|
||||
// WARNING: Tricky flow below. We are here because: isNewContainerNeeded is true,
|
||||
// in other words, we need to launch a new container at this time due to high load.
|
||||
//
|
||||
@@ -471,9 +473,13 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
||||
select {
|
||||
case tok := <-a.resources.GetResourceToken(ctx, mem, call.CPUs, isNB):
|
||||
if tok != nil && tok.Error() != nil {
|
||||
// before returning error response, as a last resort, try evicting idle containers.
|
||||
if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) {
|
||||
if tok.Error() != CapacityFull {
|
||||
tryNotify(notifyChan, tok.Error())
|
||||
} else {
|
||||
notifyChans = a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs))
|
||||
if len(notifyChans) == 0 {
|
||||
tryNotify(notifyChan, tok.Error())
|
||||
}
|
||||
}
|
||||
} else if a.shutWg.AddSession(1) {
|
||||
go func() {
|
||||
@@ -492,12 +498,25 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
|
||||
// same timer to assume that we waited for cpu/mem long enough. Let's try to evict an
|
||||
// idle container.
|
||||
case <-time.After(a.cfg.HotPoll):
|
||||
a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs))
|
||||
notifyChans = a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs))
|
||||
case <-ctx.Done(): // timeout
|
||||
case <-a.shutWg.Closer(): // server shutdown
|
||||
}
|
||||
|
||||
state.UpdateState(ctx, ContainerStateDone, call.slots)
|
||||
defer state.UpdateState(ctx, ContainerStateDone, call.slots)
|
||||
|
||||
// IMPORTANT: we wait here for any possible evictions to finalize. Otherwise
|
||||
// hotLauncher could call checkLaunch again and cause a capacity full (http 503)
|
||||
// error.
|
||||
for _, wait := range notifyChans {
|
||||
select {
|
||||
case <-wait:
|
||||
case <-ctx.Done(): // timeout
|
||||
return
|
||||
case <-a.shutWg.Closer(): // server shutdown
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// waitHot pings and waits for a hot container from the slot queue
|
||||
@@ -936,6 +955,12 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
ctx, span := trace.StartSpan(ctx, "agent_run_hot")
|
||||
defer span.End()
|
||||
|
||||
// IMPORTANT: evict token is deleted *after* resource token in defer statements below.
|
||||
// This ordering allows resource token to be freed first, which means once evict token
|
||||
// is deleted, eviction is considered to be completed.
|
||||
evictor := a.evictor.CreateEvictToken(call.slotHashId, call.Memory+uint64(call.TmpFsSize), uint64(call.CPUs))
|
||||
defer a.evictor.DeleteEvictToken(evictor)
|
||||
|
||||
statsUtilization(ctx, a.resources.GetUtilization())
|
||||
defer func() {
|
||||
statsUtilization(ctx, a.resources.GetUtilization())
|
||||
@@ -1039,7 +1064,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
udsClient: udsClient,
|
||||
containerSpan: trace.FromContext(ctx).SpanContext(),
|
||||
}
|
||||
if !a.runHotReq(ctx, call, state, logger, cookie, slot) {
|
||||
if !a.runHotReq(ctx, call, state, logger, cookie, slot, evictor) {
|
||||
return
|
||||
}
|
||||
// wait for this call to finish
|
||||
@@ -1141,7 +1166,7 @@ func inotifyAwait(ctx context.Context, iofsDir string) error {
|
||||
// runHotReq enqueues a free slot to slot queue manager and watches various timers and the consumer until
|
||||
// the slot is consumed. A return value of false means, the container should shutdown and no subsequent
|
||||
// calls should be made to this function.
|
||||
func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, logger logrus.FieldLogger, cookie drivers.Cookie, slot *hotSlot) bool {
|
||||
func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, logger logrus.FieldLogger, cookie drivers.Cookie, slot *hotSlot, evictor *EvictToken) bool {
|
||||
|
||||
var err error
|
||||
isFrozen := false
|
||||
@@ -1149,10 +1174,9 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
|
||||
freezeTimer := time.NewTimer(a.cfg.FreezeIdle)
|
||||
idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second)
|
||||
evictor := a.evictor.GetEvictor(call.ID, call.slotHashId, call.Memory+uint64(call.TmpFsSize), uint64(call.CPUs))
|
||||
|
||||
defer func() {
|
||||
a.evictor.UnregisterEvictor(evictor)
|
||||
evictor.SetEvictable(false)
|
||||
freezeTimer.Stop()
|
||||
idleTimer.Stop()
|
||||
// log if any error is encountered
|
||||
@@ -1161,7 +1185,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
}
|
||||
}()
|
||||
|
||||
a.evictor.RegisterEvictor(evictor)
|
||||
evictor.SetEvictable(true)
|
||||
state.UpdateState(ctx, ContainerStateIdle, call.slots)
|
||||
|
||||
s := call.slots.queueSlot(slot)
|
||||
@@ -1189,7 +1213,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
|
||||
break
|
||||
}
|
||||
|
||||
a.evictor.UnregisterEvictor(evictor)
|
||||
evictor.SetEvictable(false)
|
||||
|
||||
// if we can acquire token, that means we are here due to
|
||||
// abort/shutdown/timeout, attempt to acquire and terminate,
|
||||
|
||||
Reference in New Issue
Block a user