diff --git a/api/agent/agent.go b/api/agent/agent.go index 2e1f24358..af33b8172 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -379,7 +379,7 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { // waitHot pings and waits for a hot container from the slot queue func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { - ch, cancel := call.slots.startDequeuer(ctx) + ch, cancel := call.slots.startDequeuer() defer cancel() for { diff --git a/api/agent/async.go b/api/agent/async.go index 9db9277d3..b145586b3 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -16,10 +16,13 @@ func (a *agent) asyncDequeue() { defer cancel() for { + ch, cancelWait := a.resources.WaitAsyncResource() select { case <-a.shutdown: + cancelWait() return - case <-a.resources.WaitAsyncResource(): + case <-ch: + cancelWait() // TODO we _could_ return a token here to reserve the ram so that there's // not a race between here and Submit but we're single threaded // dequeueing and retries handled gracefully inside of Submit if we run diff --git a/api/agent/resource.go b/api/agent/resource.go index 8e2498ee5..b2485acde 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -24,7 +24,7 @@ const ( // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // TODO: add cpu, disk, network IO for future type ResourceTracker interface { - WaitAsyncResource() chan struct{} + WaitAsyncResource() (chan struct{}, context.CancelFunc) // returns a closed channel if the resource can never me met. GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken } @@ -184,21 +184,32 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c // WaitAsyncResource will send a signal on the returned channel when RAM and CPU in-use // in the async area is less than high water mark -func (a *resourceTracker) WaitAsyncResource() chan struct{} { - ch := make(chan struct{}) +func (a *resourceTracker) WaitAsyncResource() (chan struct{}, context.CancelFunc) { + ch := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) c := a.cond + + myCancel := func() { + cancel() + c.L.Lock() + c.Broadcast() + c.L.Unlock() + } + go func() { c.L.Lock() - for a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark { + for (a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil { c.Wait() } c.L.Unlock() - ch <- struct{}{} - // TODO this could leak forever (only in shutdown, blech) + + if ctx.Err() == nil { + ch <- struct{}{} + } }() - return ch + return ch, myCancel } func minUint64(a, b uint64) uint64 { diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go index 3044fc492..e8fadeae6 100644 --- a/api/agent/resource_test.go +++ b/api/agent/resource_test.go @@ -116,10 +116,11 @@ func TestResourceAsyncWait(t *testing.T) { // should block & wait vals.mau = vals.mam setTrackerTestVals(tr, &vals) - ch := tr.WaitAsyncResource() + ch1, cancel1 := tr.WaitAsyncResource() + defer cancel1() select { - case <-ch: + case <-ch1: t.Fatal("high water mark MEM over, should not trigger") case <-time.After(time.Duration(500) * time.Millisecond): } @@ -129,20 +130,21 @@ func TestResourceAsyncWait(t *testing.T) { setTrackerTestVals(tr, &vals) select { - case <-ch: + case <-ch1: case <-time.After(time.Duration(500) * time.Millisecond): t.Fatal("high water mark MEM not over, should trigger") } // get a new channel to prevent previous test interference - ch = tr.WaitAsyncResource() + ch2, cancel2 := tr.WaitAsyncResource() + defer cancel2() // should block & wait vals.cau = vals.cam setTrackerTestVals(tr, &vals) select { - case <-ch: + case <-ch2: t.Fatal("high water mark CPU over, should not trigger") case <-time.After(time.Duration(500) * time.Millisecond): } @@ -152,11 +154,10 @@ func TestResourceAsyncWait(t *testing.T) { setTrackerTestVals(tr, &vals) select { - case <-ch: + case <-ch2: case <-time.After(time.Duration(500) * time.Millisecond): t.Fatal("high water mark CPU not over, should trigger") } - } func TestResourceGetSimple(t *testing.T) { diff --git a/api/agent/slots.go b/api/agent/slots.go index 0eff17da3..fc3c57abe 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -112,13 +112,15 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool { return true } -func (a *slotQueue) startDequeuer(ctx context.Context) (chan *slotToken, context.CancelFunc) { +func (a *slotQueue) startDequeuer() (chan *slotToken, context.CancelFunc) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(context.Background()) myCancel := func() { cancel() + a.cond.L.Lock() a.cond.Broadcast() + a.cond.L.Unlock() } output := make(chan *slotToken) diff --git a/api/agent/slots_test.go b/api/agent/slots_test.go index 5c5fb74de..c700af84f 100644 --- a/api/agent/slots_test.go +++ b/api/agent/slots_test.go @@ -47,7 +47,7 @@ func TestSlotQueueBasic1(t *testing.T) { obj := NewSlotQueue(slotName) - outChan, cancel := obj.startDequeuer(context.Background()) + outChan, cancel := obj.startDequeuer() select { case z := <-outChan: t.Fatalf("Should not get anything from queue: %#v", z) @@ -92,7 +92,7 @@ func TestSlotQueueBasic1(t *testing.T) { t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5]) } - outChan, cancel = obj.startDequeuer(context.Background()) + outChan, cancel = obj.startDequeuer() // now we should get 8 select { @@ -162,7 +162,7 @@ func TestSlotQueueBasic2(t *testing.T) { t.Fatalf("Should be idle") } - outChan, cancel := obj.startDequeuer(context.Background()) + outChan, cancel := obj.startDequeuer() select { case z := <-outChan: t.Fatalf("Should not get anything from queue: %#v", z) @@ -248,7 +248,7 @@ func TestSlotQueueBasic3(t *testing.T) { slotName := "test3" obj := NewSlotQueue(slotName) - _, cancel1 := obj.startDequeuer(context.Background()) + _, cancel1 := obj.startDequeuer() slot1 := NewTestSlot(1) slot2 := NewTestSlot(2) @@ -259,7 +259,7 @@ func TestSlotQueueBasic3(t *testing.T) { // to cause a requeue. This should cause [1, 2] ordering to [2, 1] cancel1() - outChan, cancel2 := obj.startDequeuer(context.Background()) + outChan, cancel2 := obj.startDequeuer() // we should get '2' since cancel1() reordered the queue select { @@ -303,7 +303,7 @@ func TestSlotQueueBasic3(t *testing.T) { wg.Add(goMax) for i := 0; i < goMax; i += 1 { go func(id int) { - ch, cancl := obj.startDequeuer(context.Background()) + ch, cancl := obj.startDequeuer() defer cancl() defer wg.Done()