fn: non-blocking resource tracker and notification (#841)

* fn: non-blocking resource tracker and notification

For some types of errors, we might want to notify
the actual caller if the error is directly 1-1 tied
to that request. If hotLauncher is triggered with
signaller, then here we send a back communication
error notification channel. This is passed to
checkLaunch to send back synchronous responses
to the caller that initiated this hot container
launch.

This is useful if we want to run the agent in
quick fail mode, where instead of waiting for
CPU/Mem to become available, we prefer to fail
quick in order not to hold up the caller.
To support this, non-blocking resource tracker
option/functions are now available.

* fn: test env var rename tweak

* fn: fixup merge

* fn: rebase test fix

* fn: merge fixup

* fn: test tweak down to 70MB for 128MB total

* fn: refactor token creation and use broadcast regardless

* fn: nb description

* fn: bugfix
This commit is contained in:
Tolga Ceylan
2018-04-24 21:59:33 -07:00
committed by Reed Allman
parent 51197d4985
commit 54ba49be65
6 changed files with 359 additions and 92 deletions

View File

@@ -315,6 +315,8 @@ func transformTimeout(e error, isRetriable bool) error {
return models.ErrCallTimeoutServerBusy
}
return models.ErrCallTimeout
} else if e == CapacityFull {
return models.ErrCallTimeoutServerBusy
}
return e
}
@@ -402,9 +404,12 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) {
ctx, span := trace.StartSpan(ctx, "agent_hot_launcher")
defer span.End()
var notifyChan chan error
for {
ctx, cancel := context.WithTimeout(ctx, timeout)
a.checkLaunch(ctx, call)
a.checkLaunch(ctx, call, notifyChan)
notifyChan = nil
select {
case <-a.shutWg.Closer(): // server shutdown
@@ -416,15 +421,25 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) {
logger.Info("Hot function launcher timed out")
return
}
case <-call.slots.signaller:
case notifyChan = <-call.slots.signaller:
cancel()
}
}
}
func (a *agent) checkLaunch(ctx context.Context, call *call) {
func tryNotify(notifyChan chan error, err error) {
if notifyChan != nil && err != nil {
select {
case notifyChan <- err:
default:
}
}
}
func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan error) {
curStats := call.slots.getStats()
isAsync := call.Type == models.TypeAsync
isNB := a.cfg.EnableNBResourceTracker
isNeeded := isNewContainerNeeded(&curStats)
common.Logger(ctx).WithFields(logrus.Fields{"currentStats": curStats, "isNeeded": isNeeded}).Debug("Hot function launcher stats")
if !isNeeded {
@@ -436,9 +451,28 @@ func (a *agent) checkLaunch(ctx context.Context, call *call) {
common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container")
// WARNING: Tricky flow below. We are here because: isNeeded is set,
// in other words, we need to launch a new container at this time due to high load.
//
// For non-blocking mode, this means, if we cannot acquire resources (cpu+mem), then we need
// to notify the caller through notifyChan. This is not perfect as the callers and
// checkLaunch do not match 1-1. But this is OK, we can notify *any* waiter that
// has signalled us, this is because non-blocking mode is a system wide setting.
// The notifications are lossy, but callers will signal/poll again if this is the case
// or this may not matter if they've already acquired an empty slot.
//
// For Non-blocking mode, a.cfg.HotPoll should not be set to too high since a missed
// notify event from here will add a.cfg.HotPoll msec latency. Setting a.cfg.HotPoll may
// be an acceptable workaround for the short term since non-blocking mode likely to reduce
// the number of waiters which perhaps could compensate for more frequent polling.
//
// Non-blocking mode only applies to cpu+mem, and if isNeeded decided that we do not
// need to start a new container, then waiters will wait.
select {
case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync):
if a.shutWg.AddSession(1) {
case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB):
if tok != nil && tok.Error() != nil {
tryNotify(notifyChan, tok.Error())
} else if a.shutWg.AddSession(1) {
go func() {
// NOTE: runHot will not inherit the timeout from ctx (ignore timings)
a.runHot(ctx, call, tok, state)
@@ -466,12 +500,16 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
ch := call.slots.startDequeuer(ctx)
notifyChan := make(chan error)
// 1) if we can get a slot immediately, grab it.
// 2) if we don't, send a signaller every x msecs until we do.
sleep := 1 * time.Microsecond // pad, so time.After doesn't send immediately
for {
select {
case err := <-notifyChan:
return nil, err
case s := <-ch:
if call.slots.acquireSlot(s) {
if s.slot.Error() != nil {
@@ -493,7 +531,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
sleep = a.cfg.HotPoll
// send a notification to launchHot()
select {
case call.slots.signaller <- true:
case call.slots.signaller <- notifyChan:
default:
}
}
@@ -503,6 +541,8 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
// returns the slot for that new container to run the request on.
func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
isAsync := call.Type == models.TypeAsync
isNB := a.cfg.EnableNBResourceTracker
ch := make(chan Slot)
ctx, span := trace.StartSpan(ctx, "agent_launch_cold")
@@ -511,7 +551,11 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
call.containerState.UpdateState(ctx, ContainerStateWait, call.slots)
select {
case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync):
case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB):
if tok.Error() != nil {
return nil, tok.Error()
}
go a.prepCold(ctx, call, tok, ch)
case <-ctx.Done():
return nil, ctx.Err()