fn: cancellations in WaitAsyncResource (#694)

* fn: cancellations in WaitAsyncResource

Added go context with cancel to wait async resource. Although
today, the only case for cancellation is shutdown, this cleans
up agent shutdown a little bit.

* fn: locked broadcast to avoid missed wake-ups

* fn: removed ctx arg to WaitAsyncResource and startDequeuer

This is confusing and unnecessary.
This commit is contained in:
Tolga Ceylan
2018-01-17 16:08:54 -08:00
committed by GitHub
parent 7c74c2fe88
commit 5a7778a656
6 changed files with 41 additions and 24 deletions

View File

@@ -379,7 +379,7 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) {
// waitHot pings and waits for a hot container from the slot queue // waitHot pings and waits for a hot container from the slot queue
func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
ch, cancel := call.slots.startDequeuer(ctx) ch, cancel := call.slots.startDequeuer()
defer cancel() defer cancel()
for { for {

View File

@@ -16,10 +16,13 @@ func (a *agent) asyncDequeue() {
defer cancel() defer cancel()
for { for {
ch, cancelWait := a.resources.WaitAsyncResource()
select { select {
case <-a.shutdown: case <-a.shutdown:
cancelWait()
return return
case <-a.resources.WaitAsyncResource(): case <-ch:
cancelWait()
// TODO we _could_ return a token here to reserve the ram so that there's // TODO we _could_ return a token here to reserve the ram so that there's
// not a race between here and Submit but we're single threaded // not a race between here and Submit but we're single threaded
// dequeueing and retries handled gracefully inside of Submit if we run // dequeueing and retries handled gracefully inside of Submit if we run

View File

@@ -24,7 +24,7 @@ const (
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
// TODO: add cpu, disk, network IO for future // TODO: add cpu, disk, network IO for future
type ResourceTracker interface { type ResourceTracker interface {
WaitAsyncResource() chan struct{} WaitAsyncResource() (chan struct{}, context.CancelFunc)
// returns a closed channel if the resource can never me met. // returns a closed channel if the resource can never me met.
GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken
} }
@@ -184,21 +184,32 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
// WaitAsyncResource will send a signal on the returned channel when RAM and CPU in-use // WaitAsyncResource will send a signal on the returned channel when RAM and CPU in-use
// in the async area is less than high water mark // in the async area is less than high water mark
func (a *resourceTracker) WaitAsyncResource() chan struct{} { func (a *resourceTracker) WaitAsyncResource() (chan struct{}, context.CancelFunc) {
ch := make(chan struct{}) ch := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
c := a.cond c := a.cond
myCancel := func() {
cancel()
c.L.Lock()
c.Broadcast()
c.L.Unlock()
}
go func() { go func() {
c.L.Lock() c.L.Lock()
for a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark { for (a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil {
c.Wait() c.Wait()
} }
c.L.Unlock() c.L.Unlock()
ch <- struct{}{}
// TODO this could leak forever (only in shutdown, blech) if ctx.Err() == nil {
ch <- struct{}{}
}
}() }()
return ch return ch, myCancel
} }
func minUint64(a, b uint64) uint64 { func minUint64(a, b uint64) uint64 {

View File

@@ -116,10 +116,11 @@ func TestResourceAsyncWait(t *testing.T) {
// should block & wait // should block & wait
vals.mau = vals.mam vals.mau = vals.mam
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
ch := tr.WaitAsyncResource() ch1, cancel1 := tr.WaitAsyncResource()
defer cancel1()
select { select {
case <-ch: case <-ch1:
t.Fatal("high water mark MEM over, should not trigger") t.Fatal("high water mark MEM over, should not trigger")
case <-time.After(time.Duration(500) * time.Millisecond): case <-time.After(time.Duration(500) * time.Millisecond):
} }
@@ -129,20 +130,21 @@ func TestResourceAsyncWait(t *testing.T) {
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
select { select {
case <-ch: case <-ch1:
case <-time.After(time.Duration(500) * time.Millisecond): case <-time.After(time.Duration(500) * time.Millisecond):
t.Fatal("high water mark MEM not over, should trigger") t.Fatal("high water mark MEM not over, should trigger")
} }
// get a new channel to prevent previous test interference // get a new channel to prevent previous test interference
ch = tr.WaitAsyncResource() ch2, cancel2 := tr.WaitAsyncResource()
defer cancel2()
// should block & wait // should block & wait
vals.cau = vals.cam vals.cau = vals.cam
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
select { select {
case <-ch: case <-ch2:
t.Fatal("high water mark CPU over, should not trigger") t.Fatal("high water mark CPU over, should not trigger")
case <-time.After(time.Duration(500) * time.Millisecond): case <-time.After(time.Duration(500) * time.Millisecond):
} }
@@ -152,11 +154,10 @@ func TestResourceAsyncWait(t *testing.T) {
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
select { select {
case <-ch: case <-ch2:
case <-time.After(time.Duration(500) * time.Millisecond): case <-time.After(time.Duration(500) * time.Millisecond):
t.Fatal("high water mark CPU not over, should trigger") t.Fatal("high water mark CPU not over, should trigger")
} }
} }
func TestResourceGetSimple(t *testing.T) { func TestResourceGetSimple(t *testing.T) {

View File

@@ -112,13 +112,15 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool {
return true return true
} }
func (a *slotQueue) startDequeuer(ctx context.Context) (chan *slotToken, context.CancelFunc) { func (a *slotQueue) startDequeuer() (chan *slotToken, context.CancelFunc) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(context.Background())
myCancel := func() { myCancel := func() {
cancel() cancel()
a.cond.L.Lock()
a.cond.Broadcast() a.cond.Broadcast()
a.cond.L.Unlock()
} }
output := make(chan *slotToken) output := make(chan *slotToken)

View File

@@ -47,7 +47,7 @@ func TestSlotQueueBasic1(t *testing.T) {
obj := NewSlotQueue(slotName) obj := NewSlotQueue(slotName)
outChan, cancel := obj.startDequeuer(context.Background()) outChan, cancel := obj.startDequeuer()
select { select {
case z := <-outChan: case z := <-outChan:
t.Fatalf("Should not get anything from queue: %#v", z) t.Fatalf("Should not get anything from queue: %#v", z)
@@ -92,7 +92,7 @@ func TestSlotQueueBasic1(t *testing.T) {
t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5]) t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5])
} }
outChan, cancel = obj.startDequeuer(context.Background()) outChan, cancel = obj.startDequeuer()
// now we should get 8 // now we should get 8
select { select {
@@ -162,7 +162,7 @@ func TestSlotQueueBasic2(t *testing.T) {
t.Fatalf("Should be idle") t.Fatalf("Should be idle")
} }
outChan, cancel := obj.startDequeuer(context.Background()) outChan, cancel := obj.startDequeuer()
select { select {
case z := <-outChan: case z := <-outChan:
t.Fatalf("Should not get anything from queue: %#v", z) t.Fatalf("Should not get anything from queue: %#v", z)
@@ -248,7 +248,7 @@ func TestSlotQueueBasic3(t *testing.T) {
slotName := "test3" slotName := "test3"
obj := NewSlotQueue(slotName) obj := NewSlotQueue(slotName)
_, cancel1 := obj.startDequeuer(context.Background()) _, cancel1 := obj.startDequeuer()
slot1 := NewTestSlot(1) slot1 := NewTestSlot(1)
slot2 := NewTestSlot(2) slot2 := NewTestSlot(2)
@@ -259,7 +259,7 @@ func TestSlotQueueBasic3(t *testing.T) {
// to cause a requeue. This should cause [1, 2] ordering to [2, 1] // to cause a requeue. This should cause [1, 2] ordering to [2, 1]
cancel1() cancel1()
outChan, cancel2 := obj.startDequeuer(context.Background()) outChan, cancel2 := obj.startDequeuer()
// we should get '2' since cancel1() reordered the queue // we should get '2' since cancel1() reordered the queue
select { select {
@@ -303,7 +303,7 @@ func TestSlotQueueBasic3(t *testing.T) {
wg.Add(goMax) wg.Add(goMax)
for i := 0; i < goMax; i += 1 { for i := 0; i < goMax; i += 1 {
go func(id int) { go func(id int) {
ch, cancl := obj.startDequeuer(context.Background()) ch, cancl := obj.startDequeuer()
defer cancl() defer cancl()
defer wg.Done() defer wg.Done()