Bugfix: use the most recent call for ephemeral data

Things like pull tokens (which can be attached to an incoming call) may
be effectively ephemeral - they have a shorter lifetime than the
slotQueue.

Rather than having hotLauncher use the *original* call that spawned it
for this metadata, we update the slotQueue to hold an atomic reference
to the *most recent* call.

Any tokens on new calls will be used by preference, therefore - meaning
that we won't see a spurious expired token used for image pulls, etc.

Cost here is a single atomic Store/Load, which is probably as good as
it can get. The call extensions should be immutable for this process.
This commit is contained in:
jan grant
2019-07-18 15:41:44 +01:00
parent f1de7d7a07
commit bcd1a3d284
2 changed files with 17 additions and 12 deletions

View File

@@ -371,6 +371,9 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
caller.notify = make(chan error)
}
// Unconditionally update the most recent call (with any pull tokens or other ephemeral matter)
call.slots.recentCall.Store(call)
if isNew {
go a.hotLauncher(ctx, call, caller)
}
@@ -381,12 +384,12 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
// hotLauncher is spawned in a go routine for each slot queue to monitor stats and launch hot
// containers if needed. Upon shutdown or activity timeout, hotLauncher exits and during exit,
// it destroys the slot queue.
func (a *agent) hotLauncher(ctx context.Context, call *call, caller *slotCaller) {
func (a *agent) hotLauncher(ctx context.Context, originalCall *call, caller *slotCaller) {
// Let use 60 minutes or 2 * IdleTimeout as hot queue idle timeout, pick
// whichever is longer. If in this time, there's no activity, then
// we destroy the hot queue.
timeout := a.cfg.HotLauncherTimeout
idleTimeout := time.Duration(call.IdleTimeout) * time.Second * 2
idleTimeout := time.Duration(originalCall.IdleTimeout) * time.Second * 2
if timeout < idleTimeout {
timeout = idleTimeout
}
@@ -402,7 +405,8 @@ func (a *agent) hotLauncher(ctx context.Context, call *call, caller *slotCaller)
for {
ctx, cancel := context.WithTimeout(ctx, timeout)
a.checkLaunch(ctx, call, *caller)
currentCall := originalCall.slots.recentCall.Load().(*call)
a.checkLaunch(ctx, currentCall, *caller)
select {
case <-a.shutWg.Closer(): // server shutdown
@@ -410,11 +414,11 @@ func (a *agent) hotLauncher(ctx context.Context, call *call, caller *slotCaller)
return
case <-ctx.Done(): // timed out
cancel()
if a.slotMgr.deleteSlotQueue(call.slots) {
if a.slotMgr.deleteSlotQueue(originalCall.slots) {
logger.Debug("Hot function launcher timed out")
return
}
case caller = <-call.slots.signaller:
case caller = <-originalCall.slots.signaller:
cancel()
}
}

View File

@@ -51,13 +51,14 @@ type slotCaller struct {
// LIFO queue that exposes input/output channels along
// with runner/waiter tracking for agent
type slotQueue struct {
key string
cond *sync.Cond
slots []*slotToken
nextId uint64
signaller chan *slotCaller
statsLock sync.Mutex // protects stats below
stats slotQueueStats
key string
cond *sync.Cond
slots []*slotToken
nextId uint64
signaller chan *slotCaller
statsLock sync.Mutex // protects stats below
stats slotQueueStats
recentCall atomic.Value // pointer to the most recent Call object
}
func NewSlotQueueMgr() *slotQueueMgr {