Merge pull request #353 from fnproject/zzz-moar

fix slot races
This commit is contained in:
Chad Arimura
2017-09-22 09:50:40 -07:00
committed by GitHub

View File

@@ -25,7 +25,6 @@ import (
// TODO async calls need to add route.Headers as well // TODO async calls need to add route.Headers as well
// TODO need to shut off reads/writes in dispatch to the pipes when call times out so that // TODO need to shut off reads/writes in dispatch to the pipes when call times out so that
// 2 calls don't have the same container's pipes... // 2 calls don't have the same container's pipes...
// TODO fix ram token / cold slot racy races (highly unlikely, still fix)
// TODO add spans back around container launching for hot (follows from?) + other more granular spans // TODO add spans back around container launching for hot (follows from?) + other more granular spans
// TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status) // TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status)
// see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?) // see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?)
@@ -249,11 +248,16 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) (
default: default:
} }
// add context cancel here to prevent ramToken/launch race, w/o this ramToken /
// launch won't know whether we are no longer receiving or not yet receiving.
ctx, launchCancel := context.WithCancel(ctx)
defer launchCancel()
// if nothing free, wait for ram token or a slot // if nothing free, wait for ram token or a slot
select { select {
case s := <-slots: case s := <-slots:
return s, nil return s, nil
case tok := <-a.ramToken(call.Memory * 1024 * 1024): // convert MB TODO mangle case tok := <-a.ramToken(ctx, call.Memory*1024*1024): // convert MB TODO mangle
errCh = a.launch(ctx, slots, call, tok) // TODO mangle errCh = a.launch(ctx, slots, call, tok) // TODO mangle
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
@@ -352,27 +356,22 @@ func (t *token) Close() error {
return nil return nil
} }
// NOTE in theory goroutines spawned here could run forever (i.e. leak), if the provided value
// is unable to be satisfied. the calling thread will time out waiting for it and the value
// properly adjusted if ever satisfied, but we could trivially provide a ctx here and time
// out if the calling thread times out if we want to [just to prevent leaks].
//
// the received token should be passed directly to launch (unconditionally), launch // the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close) // will close this token (i.e. the receiver should not call Close)
func (a *agent) ramToken(memory uint64) <-chan Token { func (a *agent) ramToken(ctx context.Context, memory uint64) <-chan Token {
// TODO we could do an initial check here and return in the same thread so
// that a calling thread could call this and have a filled channel
// immediately so that a a select default case could be used to determine
// whether machine is at capacity (and caller can decide whether to wait) --
// right now this is a race if used as described.
c := a.cond c := a.cond
ch := make(chan Token) ch := make(chan Token)
go func() { go func() {
c.L.Lock() c.L.Lock()
for (a.ramUsed + memory) > a.ramTotal { for (a.ramUsed + memory) > a.ramTotal {
// TODO we could add ctx here select {
case <-ctx.Done():
c.L.Unlock()
return
default:
}
c.Wait() c.Wait()
} }
@@ -387,10 +386,8 @@ func (a *agent) ramToken(memory uint64) <-chan Token {
}} }}
select { select {
// TODO fix this race. caller needs to provide channel since we could get here
// before ramToken has returned. :( or something better, idk
case ch <- t: case ch <- t:
default: case <-ctx.Done():
// if we can't send b/c nobody is waiting anymore, need to decrement here // if we can't send b/c nobody is waiting anymore, need to decrement here
t.Close() t.Close()
} }
@@ -502,9 +499,6 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
// this will work for hot & cold (woo) // this will work for hot & cold (woo)
// if launch encounters a non-nil error it will send it on the returned channel, // if launch encounters a non-nil error it will send it on the returned channel,
// this can be useful if an image doesn't exist, e.g. // this can be useful if an image doesn't exist, e.g.
// TODO i don't like how this came out and it's slightly racy w/ unbuffered channels since
// the enclosing thread may not be ready to receive (up to go scheduler), need to tidy up, but
// this race is unlikely (still need to fix, yes)
func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok Token) <-chan error { func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok Token) <-chan error {
ch := make(chan error, 1) ch := make(chan error, 1)
@@ -550,8 +544,8 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok
slot := &coldSlot{cookie, tok} slot := &coldSlot{cookie, tok}
select { select {
case slots <- slot: // TODO need to make sure receiver will be ready (go routine race) case slots <- slot:
default: case <-ctx.Done():
slot.Close() // if we can't send this slot, need to take care of it ourselves slot.Close() // if we can't send this slot, need to take care of it ourselves
} }
return nil return nil