mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fix slot races
I'd be pretty surprised if these were happening but meh, a computer running at capacity can make the runtime scheduler do all kinds of weird shit, so this locks down the behavior around slot launching. I didn't load test much as there are cries of 'wolf' running amok, and it's late, so this could be off a little -- but I think it's about this easy. cold is the only one launching slots for itself, so it should always receive its own slot (provided within time bounds). for hot we just need a way to tell the ram token allocator that we aren't there anymore, so that somebody can close the token (important). If the bug still persists then it seems likely that there is another bug around timing I'm not aware of (possible, but unlikely) or the more likely case that it's actually taking up to the timeout to launch a container / find a ram slot / find a free container. Otherwise, it's not related to the agent and the http server timeouts may need fiddling with (read / write timeout), if ruby client is failing to connect though I'm guessing that it's just that nobody is reading the body (i.e. no function runs) and the error handling isn't very well done, as we are replying with 504 if we hit a timeout (but if nobody is listening, they won't get it).
This commit is contained in:
@@ -25,7 +25,6 @@ import (
|
|||||||
// TODO async calls need to add route.Headers as well
|
// TODO async calls need to add route.Headers as well
|
||||||
// TODO need to shut off reads/writes in dispatch to the pipes when call times out so that
|
// TODO need to shut off reads/writes in dispatch to the pipes when call times out so that
|
||||||
// 2 calls don't have the same container's pipes...
|
// 2 calls don't have the same container's pipes...
|
||||||
// TODO fix ram token / cold slot racy races (highly unlikely, still fix)
|
|
||||||
// TODO add spans back around container launching for hot (follows from?) + other more granular spans
|
// TODO add spans back around container launching for hot (follows from?) + other more granular spans
|
||||||
// TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status)
|
// TODO handle timeouts / no response in sync & async (sync is json+503 atm, not 504, async is empty log+status)
|
||||||
// see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?)
|
// see also: server/runner.go wrapping the response writer there, but need to handle async too (push down?)
|
||||||
@@ -249,11 +248,16 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) (
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add context cancel here to prevent ramToken/launch race, w/o this ramToken /
|
||||||
|
// launch won't know whether we are no longer receiving or not yet receiving.
|
||||||
|
ctx, launchCancel := context.WithCancel(ctx)
|
||||||
|
defer launchCancel()
|
||||||
|
|
||||||
// if nothing free, wait for ram token or a slot
|
// if nothing free, wait for ram token or a slot
|
||||||
select {
|
select {
|
||||||
case s := <-slots:
|
case s := <-slots:
|
||||||
return s, nil
|
return s, nil
|
||||||
case tok := <-a.ramToken(call.Memory * 1024 * 1024): // convert MB TODO mangle
|
case tok := <-a.ramToken(ctx, call.Memory*1024*1024): // convert MB TODO mangle
|
||||||
errCh = a.launch(ctx, slots, call, tok) // TODO mangle
|
errCh = a.launch(ctx, slots, call, tok) // TODO mangle
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
@@ -352,27 +356,22 @@ func (t *token) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE in theory goroutines spawned here could run forever (i.e. leak), if the provided value
|
|
||||||
// is unable to be satisfied. the calling thread will time out waiting for it and the value
|
|
||||||
// properly adjusted if ever satisfied, but we could trivially provide a ctx here and time
|
|
||||||
// out if the calling thread times out if we want to [just to prevent leaks].
|
|
||||||
//
|
|
||||||
// the received token should be passed directly to launch (unconditionally), launch
|
// the received token should be passed directly to launch (unconditionally), launch
|
||||||
// will close this token (i.e. the receiver should not call Close)
|
// will close this token (i.e. the receiver should not call Close)
|
||||||
func (a *agent) ramToken(memory uint64) <-chan Token {
|
func (a *agent) ramToken(ctx context.Context, memory uint64) <-chan Token {
|
||||||
// TODO we could do an initial check here and return in the same thread so
|
|
||||||
// that a calling thread could call this and have a filled channel
|
|
||||||
// immediately so that a a select default case could be used to determine
|
|
||||||
// whether machine is at capacity (and caller can decide whether to wait) --
|
|
||||||
// right now this is a race if used as described.
|
|
||||||
|
|
||||||
c := a.cond
|
c := a.cond
|
||||||
ch := make(chan Token)
|
ch := make(chan Token)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
c.L.Lock()
|
c.L.Lock()
|
||||||
for (a.ramUsed + memory) > a.ramTotal {
|
for (a.ramUsed + memory) > a.ramTotal {
|
||||||
// TODO we could add ctx here
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
c.L.Unlock()
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
c.Wait()
|
c.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -387,10 +386,8 @@ func (a *agent) ramToken(memory uint64) <-chan Token {
|
|||||||
}}
|
}}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
// TODO fix this race. caller needs to provide channel since we could get here
|
|
||||||
// before ramToken has returned. :( or something better, idk
|
|
||||||
case ch <- t:
|
case ch <- t:
|
||||||
default:
|
case <-ctx.Done():
|
||||||
// if we can't send b/c nobody is waiting anymore, need to decrement here
|
// if we can't send b/c nobody is waiting anymore, need to decrement here
|
||||||
t.Close()
|
t.Close()
|
||||||
}
|
}
|
||||||
@@ -502,9 +499,6 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
|
|||||||
// this will work for hot & cold (woo)
|
// this will work for hot & cold (woo)
|
||||||
// if launch encounters a non-nil error it will send it on the returned channel,
|
// if launch encounters a non-nil error it will send it on the returned channel,
|
||||||
// this can be useful if an image doesn't exist, e.g.
|
// this can be useful if an image doesn't exist, e.g.
|
||||||
// TODO i don't like how this came out and it's slightly racy w/ unbuffered channels since
|
|
||||||
// the enclosing thread may not be ready to receive (up to go scheduler), need to tidy up, but
|
|
||||||
// this race is unlikely (still need to fix, yes)
|
|
||||||
func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok Token) <-chan error {
|
func (a *agent) launch(ctx context.Context, slots chan<- slot, call *call, tok Token) <-chan error {
|
||||||
ch := make(chan error, 1)
|
ch := make(chan error, 1)
|
||||||
|
|
||||||
@@ -550,8 +544,8 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok
|
|||||||
|
|
||||||
slot := &coldSlot{cookie, tok}
|
slot := &coldSlot{cookie, tok}
|
||||||
select {
|
select {
|
||||||
case slots <- slot: // TODO need to make sure receiver will be ready (go routine race)
|
case slots <- slot:
|
||||||
default:
|
case <-ctx.Done():
|
||||||
slot.Close() // if we can't send this slot, need to take care of it ourselves
|
slot.Close() // if we can't send this slot, need to take care of it ourselves
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user