diff --git a/api/agent/agent.go b/api/agent/agent.go index f74a8c23a..8d99c5dde 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -344,15 +344,15 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { continue } - resourceCtx, cancel := context.WithCancel(context.Background()) + ctxResource, cancelResource := context.WithCancel(context.Background()) logger.WithFields(logrus.Fields{ "currentStats": curStats, "previousStats": curStats, }).Info("Hot function launcher starting hot container") select { - case tok, isOpen := <-a.resources.GetResourceToken(resourceCtx, callObj.Memory, uint64(callObj.CPUs), isAsync): - cancel() + case tok, isOpen := <-a.resources.GetResourceToken(ctxResource, callObj.Memory, uint64(callObj.CPUs), isAsync): + cancelResource() if isOpen { a.wg.Add(1) go func(ctx context.Context, call *call, tok ResourceToken) { @@ -364,13 +364,13 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { callObj.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: models.ErrCallTimeoutServerBusy}) } case <-time.After(timeout): - cancel() + cancelResource() if a.slotMgr.deleteSlotQueue(callObj.slots) { logger.Info("Hot function launcher timed out") return } case <-a.shutdown: // server shutdown - cancel() + cancelResource() return } } @@ -378,8 +378,10 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { // waitHot pings and waits for a hot container from the slot queue func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { - ch, cancel := call.slots.startDequeuer() - defer cancel() + + ctxDequeuer, cancelDequeuer := context.WithCancel(ctx) + defer cancelDequeuer() + ch := call.slots.startDequeuer(ctxDequeuer) // 1) if we can get a slot immediately, grab it. // 2) if we don't, send a signaller every 200ms until we do. @@ -420,9 +422,11 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { isAsync := call.Type == models.TypeAsync ch := make(chan Slot) + ctxResource, cancelResource := context.WithCancel(ctx) + defer cancelResource() select { - case tok, isOpen := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): + case tok, isOpen := <-a.resources.GetResourceToken(ctxResource, call.Memory, uint64(call.CPUs), isAsync): if !isOpen { return nil, models.ErrCallTimeoutServerBusy } @@ -431,6 +435,8 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { return nil, ctx.Err() } + cancelResource() + // wait for launch err or a slot to open up select { case s := <-ch: diff --git a/api/agent/async.go b/api/agent/async.go index b145586b3..f31e19685 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -16,13 +16,13 @@ func (a *agent) asyncDequeue() { defer cancel() for { - ch, cancelWait := a.resources.WaitAsyncResource() + ctxResource, cancelResource := context.WithCancel(context.Background()) select { case <-a.shutdown: - cancelWait() + cancelResource() return - case <-ch: - cancelWait() + case <-a.resources.WaitAsyncResource(ctxResource): + cancelResource() // TODO we _could_ return a token here to reserve the ram so that there's // not a race between here and Submit but we're single threaded // dequeueing and retries handled gracefully inside of Submit if we run diff --git a/api/agent/resource.go b/api/agent/resource.go index b2485acde..62fbb8874 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -24,7 +24,7 @@ const ( // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // TODO: add cpu, disk, network IO for future type ResourceTracker interface { - WaitAsyncResource() (chan struct{}, context.CancelFunc) + WaitAsyncResource(ctx context.Context) chan struct{} // returns a closed channel if the resource can never me met. GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken } @@ -115,6 +115,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c memory = memory * Mem1MB c := a.cond + isWaiting := false ch := make(chan ResourceToken) if !a.isResourcePossible(memory, cpuQuota, isAsync) { @@ -122,12 +123,23 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c return ch } + go func() { + <-ctx.Done() + c.L.Lock() + if isWaiting { + c.Broadcast() + } + c.L.Unlock() + }() + go func() { c.L.Lock() + isWaiting = true for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { c.Wait() } + isWaiting = false if ctx.Err() != nil { c.L.Unlock() @@ -184,24 +196,28 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c // WaitAsyncResource will send a signal on the returned channel when RAM and CPU in-use // in the async area is less than high water mark -func (a *resourceTracker) WaitAsyncResource() (chan struct{}, context.CancelFunc) { +func (a *resourceTracker) WaitAsyncResource(ctx context.Context) chan struct{} { ch := make(chan struct{}, 1) - ctx, cancel := context.WithCancel(context.Background()) + isWaiting := false c := a.cond - myCancel := func() { - cancel() + go func() { + <-ctx.Done() c.L.Lock() - c.Broadcast() + if isWaiting { + c.Broadcast() + } c.L.Unlock() - } + }() go func() { c.L.Lock() + isWaiting = true for (a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil { c.Wait() } + isWaiting = false c.L.Unlock() if ctx.Err() == nil { @@ -209,7 +225,7 @@ func (a *resourceTracker) WaitAsyncResource() (chan struct{}, context.CancelFunc } }() - return ch, myCancel + return ch } func minUint64(a, b uint64) uint64 { diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go index e8fadeae6..aa61c6fba 100644 --- a/api/agent/resource_test.go +++ b/api/agent/resource_test.go @@ -116,7 +116,9 @@ func TestResourceAsyncWait(t *testing.T) { // should block & wait vals.mau = vals.mam setTrackerTestVals(tr, &vals) - ch1, cancel1 := tr.WaitAsyncResource() + + ctx1, cancel1 := context.WithCancel(context.Background()) + ch1 := tr.WaitAsyncResource(ctx1) defer cancel1() select { @@ -136,7 +138,8 @@ func TestResourceAsyncWait(t *testing.T) { } // get a new channel to prevent previous test interference - ch2, cancel2 := tr.WaitAsyncResource() + ctx2, cancel2 := context.WithCancel(context.Background()) + ch2 := tr.WaitAsyncResource(ctx2) defer cancel2() // should block & wait @@ -174,11 +177,10 @@ func TestResourceGetSimple(t *testing.T) { setTrackerTestVals(tr, &vals) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // ask for 4GB and 10 CPU + ctx, cancel := context.WithCancel(context.Background()) ch := trI.GetResourceToken(ctx, 4*1024, 1000, false) + defer cancel() _, err := fetchToken(ch) if err == nil { @@ -195,7 +197,9 @@ func TestResourceGetSimple(t *testing.T) { } // ask for another 4GB and 10 CPU + ctx, cancel = context.WithCancel(context.Background()) ch = trI.GetResourceToken(ctx, 4*1024, 1000, false) + defer cancel() _, err = fetchToken(ch) if err == nil { @@ -231,9 +235,8 @@ func TestResourceGetCombo(t *testing.T) { vals.setDefaults() setTrackerTestVals(tr, &vals) - ctx, cancel := context.WithCancel(context.Background()) - // impossible request + ctx, cancel := context.WithCancel(context.Background()) ch := trI.GetResourceToken(ctx, 20*1024, 20000, false) if !isClosed(ch) { t.Fatalf("impossible request should return closed channel") @@ -281,6 +284,7 @@ func TestResourceGetCombo(t *testing.T) { if err != nil { t.Fatalf("empty sync system should hand out token3") } + cancel() ctx, cancel = context.WithCancel(context.Background()) @@ -292,6 +296,7 @@ func TestResourceGetCombo(t *testing.T) { if err == nil { t.Fatalf("full system should not hand out a token") } + cancel() ctx, cancel = context.WithCancel(context.Background()) @@ -307,6 +312,7 @@ func TestResourceGetCombo(t *testing.T) { if err != nil { t.Fatalf("async system should hand out token4") } + cancel() ctx, cancel = context.WithCancel(context.Background()) @@ -324,6 +330,7 @@ func TestResourceGetCombo(t *testing.T) { if err != nil { t.Fatalf("async+sync system should hand out token5") } + cancel() // NOW ASYNC AND SYNC POOLS ARE FULL diff --git a/api/agent/slots.go b/api/agent/slots.go index fc3c57abe..c2fe1bcc7 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -112,25 +112,29 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool { return true } -func (a *slotQueue) startDequeuer() (chan *slotToken, context.CancelFunc) { - - ctx, cancel := context.WithCancel(context.Background()) - - myCancel := func() { - cancel() - a.cond.L.Lock() - a.cond.Broadcast() - a.cond.L.Unlock() - } +func (a *slotQueue) startDequeuer(ctx context.Context) chan *slotToken { + isWaiting := false output := make(chan *slotToken) + go func() { + <-ctx.Done() + a.cond.L.Lock() + if isWaiting { + a.cond.Broadcast() + } + a.cond.L.Unlock() + }() + go func() { for { a.cond.L.Lock() + + isWaiting = true for len(a.slots) <= 0 && (ctx.Err() == nil) { a.cond.Wait() } + isWaiting = false if ctx.Err() != nil { a.cond.L.Unlock() @@ -154,7 +158,7 @@ func (a *slotQueue) startDequeuer() (chan *slotToken, context.CancelFunc) { } }() - return output, myCancel + return output } func (a *slotQueue) queueSlot(slot Slot) *slotToken { diff --git a/api/agent/slots_test.go b/api/agent/slots_test.go index c700af84f..d4dad66be 100644 --- a/api/agent/slots_test.go +++ b/api/agent/slots_test.go @@ -47,7 +47,8 @@ func TestSlotQueueBasic1(t *testing.T) { obj := NewSlotQueue(slotName) - outChan, cancel := obj.startDequeuer() + ctx, cancel := context.WithCancel(context.Background()) + outChan := obj.startDequeuer(ctx) select { case z := <-outChan: t.Fatalf("Should not get anything from queue: %#v", z) @@ -92,7 +93,8 @@ func TestSlotQueueBasic1(t *testing.T) { t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5]) } - outChan, cancel = obj.startDequeuer() + ctx, cancel = context.WithCancel(context.Background()) + outChan = obj.startDequeuer(ctx) // now we should get 8 select { @@ -162,14 +164,14 @@ func TestSlotQueueBasic2(t *testing.T) { t.Fatalf("Should be idle") } - outChan, cancel := obj.startDequeuer() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + select { - case z := <-outChan: + case z := <-obj.startDequeuer(ctx): t.Fatalf("Should not get anything from queue: %#v", z) case <-time.After(time.Duration(500) * time.Millisecond): } - - cancel() } func statsHelperSet(runC, startC, waitC, runL, startL, waitL uint64) slotQueueStats { @@ -248,7 +250,8 @@ func TestSlotQueueBasic3(t *testing.T) { slotName := "test3" obj := NewSlotQueue(slotName) - _, cancel1 := obj.startDequeuer() + ctx, cancel := context.WithCancel(context.Background()) + obj.startDequeuer(ctx) slot1 := NewTestSlot(1) slot2 := NewTestSlot(2) @@ -257,9 +260,10 @@ func TestSlotQueueBasic3(t *testing.T) { // now our slot must be ready in outChan, but let's cancel it // to cause a requeue. This should cause [1, 2] ordering to [2, 1] - cancel1() + cancel() - outChan, cancel2 := obj.startDequeuer() + ctx, cancel = context.WithCancel(context.Background()) + outChan := obj.startDequeuer(ctx) // we should get '2' since cancel1() reordered the queue select { @@ -303,12 +307,13 @@ func TestSlotQueueBasic3(t *testing.T) { wg.Add(goMax) for i := 0; i < goMax; i += 1 { go func(id int) { - ch, cancl := obj.startDequeuer() - defer cancl() defer wg.Done() + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + select { - case z := <-ch: + case z := <-obj.startDequeuer(ctx): t.Fatalf("%v we shouldn't get anything from queue %#v", id, z) case <-time.After(time.Duration(500) * time.Millisecond): } @@ -316,7 +321,7 @@ func TestSlotQueueBasic3(t *testing.T) { } // let's cancel after destroy this time - cancel2() + cancel() wg.Wait()