fn: resource and slot cancel and broadcast improvements (#696)

* fn: resource and slot cancel and broadcast improvements

*) Context argument does not wake up the waiters correctly upon
cancellation/timeout.
*) Avoid unnecessary broadcasts in slot and resource.

* fn: limit scope of context in resource/slot calls in agent
This commit is contained in:
Tolga Ceylan
2018-01-18 13:43:56 -08:00
committed by GitHub
parent c9e995292c
commit 2f0de2b574
6 changed files with 89 additions and 51 deletions

View File

@@ -344,15 +344,15 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) {
continue continue
} }
resourceCtx, cancel := context.WithCancel(context.Background()) ctxResource, cancelResource := context.WithCancel(context.Background())
logger.WithFields(logrus.Fields{ logger.WithFields(logrus.Fields{
"currentStats": curStats, "currentStats": curStats,
"previousStats": curStats, "previousStats": curStats,
}).Info("Hot function launcher starting hot container") }).Info("Hot function launcher starting hot container")
select { select {
case tok, isOpen := <-a.resources.GetResourceToken(resourceCtx, callObj.Memory, uint64(callObj.CPUs), isAsync): case tok, isOpen := <-a.resources.GetResourceToken(ctxResource, callObj.Memory, uint64(callObj.CPUs), isAsync):
cancel() cancelResource()
if isOpen { if isOpen {
a.wg.Add(1) a.wg.Add(1)
go func(ctx context.Context, call *call, tok ResourceToken) { go func(ctx context.Context, call *call, tok ResourceToken) {
@@ -364,13 +364,13 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) {
callObj.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: models.ErrCallTimeoutServerBusy}) callObj.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: models.ErrCallTimeoutServerBusy})
} }
case <-time.After(timeout): case <-time.After(timeout):
cancel() cancelResource()
if a.slotMgr.deleteSlotQueue(callObj.slots) { if a.slotMgr.deleteSlotQueue(callObj.slots) {
logger.Info("Hot function launcher timed out") logger.Info("Hot function launcher timed out")
return return
} }
case <-a.shutdown: // server shutdown case <-a.shutdown: // server shutdown
cancel() cancelResource()
return return
} }
} }
@@ -378,8 +378,10 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) {
// waitHot pings and waits for a hot container from the slot queue // waitHot pings and waits for a hot container from the slot queue
func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
ch, cancel := call.slots.startDequeuer()
defer cancel() ctxDequeuer, cancelDequeuer := context.WithCancel(ctx)
defer cancelDequeuer()
ch := call.slots.startDequeuer(ctxDequeuer)
// 1) if we can get a slot immediately, grab it. // 1) if we can get a slot immediately, grab it.
// 2) if we don't, send a signaller every 200ms until we do. // 2) if we don't, send a signaller every 200ms until we do.
@@ -420,9 +422,11 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
isAsync := call.Type == models.TypeAsync isAsync := call.Type == models.TypeAsync
ch := make(chan Slot) ch := make(chan Slot)
ctxResource, cancelResource := context.WithCancel(ctx)
defer cancelResource()
select { select {
case tok, isOpen := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): case tok, isOpen := <-a.resources.GetResourceToken(ctxResource, call.Memory, uint64(call.CPUs), isAsync):
if !isOpen { if !isOpen {
return nil, models.ErrCallTimeoutServerBusy return nil, models.ErrCallTimeoutServerBusy
} }
@@ -431,6 +435,8 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
return nil, ctx.Err() return nil, ctx.Err()
} }
cancelResource()
// wait for launch err or a slot to open up // wait for launch err or a slot to open up
select { select {
case s := <-ch: case s := <-ch:

View File

@@ -16,13 +16,13 @@ func (a *agent) asyncDequeue() {
defer cancel() defer cancel()
for { for {
ch, cancelWait := a.resources.WaitAsyncResource() ctxResource, cancelResource := context.WithCancel(context.Background())
select { select {
case <-a.shutdown: case <-a.shutdown:
cancelWait() cancelResource()
return return
case <-ch: case <-a.resources.WaitAsyncResource(ctxResource):
cancelWait() cancelResource()
// TODO we _could_ return a token here to reserve the ram so that there's // TODO we _could_ return a token here to reserve the ram so that there's
// not a race between here and Submit but we're single threaded // not a race between here and Submit but we're single threaded
// dequeueing and retries handled gracefully inside of Submit if we run // dequeueing and retries handled gracefully inside of Submit if we run

View File

@@ -24,7 +24,7 @@ const (
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
// TODO: add cpu, disk, network IO for future // TODO: add cpu, disk, network IO for future
type ResourceTracker interface { type ResourceTracker interface {
WaitAsyncResource() (chan struct{}, context.CancelFunc) WaitAsyncResource(ctx context.Context) chan struct{}
// returns a closed channel if the resource can never me met. // returns a closed channel if the resource can never me met.
GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken
} }
@@ -115,6 +115,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
memory = memory * Mem1MB memory = memory * Mem1MB
c := a.cond c := a.cond
isWaiting := false
ch := make(chan ResourceToken) ch := make(chan ResourceToken)
if !a.isResourcePossible(memory, cpuQuota, isAsync) { if !a.isResourcePossible(memory, cpuQuota, isAsync) {
@@ -122,12 +123,23 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
return ch return ch
} }
go func() {
<-ctx.Done()
c.L.Lock()
if isWaiting {
c.Broadcast()
}
c.L.Unlock()
}()
go func() { go func() {
c.L.Lock() c.L.Lock()
isWaiting = true
for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil {
c.Wait() c.Wait()
} }
isWaiting = false
if ctx.Err() != nil { if ctx.Err() != nil {
c.L.Unlock() c.L.Unlock()
@@ -184,24 +196,28 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
// WaitAsyncResource will send a signal on the returned channel when RAM and CPU in-use // WaitAsyncResource will send a signal on the returned channel when RAM and CPU in-use
// in the async area is less than high water mark // in the async area is less than high water mark
func (a *resourceTracker) WaitAsyncResource() (chan struct{}, context.CancelFunc) { func (a *resourceTracker) WaitAsyncResource(ctx context.Context) chan struct{} {
ch := make(chan struct{}, 1) ch := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background()) isWaiting := false
c := a.cond c := a.cond
myCancel := func() { go func() {
cancel() <-ctx.Done()
c.L.Lock() c.L.Lock()
if isWaiting {
c.Broadcast() c.Broadcast()
c.L.Unlock()
} }
c.L.Unlock()
}()
go func() { go func() {
c.L.Lock() c.L.Lock()
isWaiting = true
for (a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil { for (a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil {
c.Wait() c.Wait()
} }
isWaiting = false
c.L.Unlock() c.L.Unlock()
if ctx.Err() == nil { if ctx.Err() == nil {
@@ -209,7 +225,7 @@ func (a *resourceTracker) WaitAsyncResource() (chan struct{}, context.CancelFunc
} }
}() }()
return ch, myCancel return ch
} }
func minUint64(a, b uint64) uint64 { func minUint64(a, b uint64) uint64 {

View File

@@ -116,7 +116,9 @@ func TestResourceAsyncWait(t *testing.T) {
// should block & wait // should block & wait
vals.mau = vals.mam vals.mau = vals.mam
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
ch1, cancel1 := tr.WaitAsyncResource()
ctx1, cancel1 := context.WithCancel(context.Background())
ch1 := tr.WaitAsyncResource(ctx1)
defer cancel1() defer cancel1()
select { select {
@@ -136,7 +138,8 @@ func TestResourceAsyncWait(t *testing.T) {
} }
// get a new channel to prevent previous test interference // get a new channel to prevent previous test interference
ch2, cancel2 := tr.WaitAsyncResource() ctx2, cancel2 := context.WithCancel(context.Background())
ch2 := tr.WaitAsyncResource(ctx2)
defer cancel2() defer cancel2()
// should block & wait // should block & wait
@@ -174,11 +177,10 @@ func TestResourceGetSimple(t *testing.T) {
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// ask for 4GB and 10 CPU // ask for 4GB and 10 CPU
ctx, cancel := context.WithCancel(context.Background())
ch := trI.GetResourceToken(ctx, 4*1024, 1000, false) ch := trI.GetResourceToken(ctx, 4*1024, 1000, false)
defer cancel()
_, err := fetchToken(ch) _, err := fetchToken(ch)
if err == nil { if err == nil {
@@ -195,7 +197,9 @@ func TestResourceGetSimple(t *testing.T) {
} }
// ask for another 4GB and 10 CPU // ask for another 4GB and 10 CPU
ctx, cancel = context.WithCancel(context.Background())
ch = trI.GetResourceToken(ctx, 4*1024, 1000, false) ch = trI.GetResourceToken(ctx, 4*1024, 1000, false)
defer cancel()
_, err = fetchToken(ch) _, err = fetchToken(ch)
if err == nil { if err == nil {
@@ -231,9 +235,8 @@ func TestResourceGetCombo(t *testing.T) {
vals.setDefaults() vals.setDefaults()
setTrackerTestVals(tr, &vals) setTrackerTestVals(tr, &vals)
ctx, cancel := context.WithCancel(context.Background())
// impossible request // impossible request
ctx, cancel := context.WithCancel(context.Background())
ch := trI.GetResourceToken(ctx, 20*1024, 20000, false) ch := trI.GetResourceToken(ctx, 20*1024, 20000, false)
if !isClosed(ch) { if !isClosed(ch) {
t.Fatalf("impossible request should return closed channel") t.Fatalf("impossible request should return closed channel")
@@ -281,6 +284,7 @@ func TestResourceGetCombo(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("empty sync system should hand out token3") t.Fatalf("empty sync system should hand out token3")
} }
cancel() cancel()
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
@@ -292,6 +296,7 @@ func TestResourceGetCombo(t *testing.T) {
if err == nil { if err == nil {
t.Fatalf("full system should not hand out a token") t.Fatalf("full system should not hand out a token")
} }
cancel() cancel()
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
@@ -307,6 +312,7 @@ func TestResourceGetCombo(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("async system should hand out token4") t.Fatalf("async system should hand out token4")
} }
cancel() cancel()
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
@@ -324,6 +330,7 @@ func TestResourceGetCombo(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("async+sync system should hand out token5") t.Fatalf("async+sync system should hand out token5")
} }
cancel() cancel()
// NOW ASYNC AND SYNC POOLS ARE FULL // NOW ASYNC AND SYNC POOLS ARE FULL

View File

@@ -112,25 +112,29 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool {
return true return true
} }
func (a *slotQueue) startDequeuer() (chan *slotToken, context.CancelFunc) { func (a *slotQueue) startDequeuer(ctx context.Context) chan *slotToken {
ctx, cancel := context.WithCancel(context.Background())
myCancel := func() {
cancel()
a.cond.L.Lock()
a.cond.Broadcast()
a.cond.L.Unlock()
}
isWaiting := false
output := make(chan *slotToken) output := make(chan *slotToken)
go func() {
<-ctx.Done()
a.cond.L.Lock()
if isWaiting {
a.cond.Broadcast()
}
a.cond.L.Unlock()
}()
go func() { go func() {
for { for {
a.cond.L.Lock() a.cond.L.Lock()
isWaiting = true
for len(a.slots) <= 0 && (ctx.Err() == nil) { for len(a.slots) <= 0 && (ctx.Err() == nil) {
a.cond.Wait() a.cond.Wait()
} }
isWaiting = false
if ctx.Err() != nil { if ctx.Err() != nil {
a.cond.L.Unlock() a.cond.L.Unlock()
@@ -154,7 +158,7 @@ func (a *slotQueue) startDequeuer() (chan *slotToken, context.CancelFunc) {
} }
}() }()
return output, myCancel return output
} }
func (a *slotQueue) queueSlot(slot Slot) *slotToken { func (a *slotQueue) queueSlot(slot Slot) *slotToken {

View File

@@ -47,7 +47,8 @@ func TestSlotQueueBasic1(t *testing.T) {
obj := NewSlotQueue(slotName) obj := NewSlotQueue(slotName)
outChan, cancel := obj.startDequeuer() ctx, cancel := context.WithCancel(context.Background())
outChan := obj.startDequeuer(ctx)
select { select {
case z := <-outChan: case z := <-outChan:
t.Fatalf("Should not get anything from queue: %#v", z) t.Fatalf("Should not get anything from queue: %#v", z)
@@ -92,7 +93,8 @@ func TestSlotQueueBasic1(t *testing.T) {
t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5]) t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5])
} }
outChan, cancel = obj.startDequeuer() ctx, cancel = context.WithCancel(context.Background())
outChan = obj.startDequeuer(ctx)
// now we should get 8 // now we should get 8
select { select {
@@ -162,14 +164,14 @@ func TestSlotQueueBasic2(t *testing.T) {
t.Fatalf("Should be idle") t.Fatalf("Should be idle")
} }
outChan, cancel := obj.startDequeuer() ctx, cancel := context.WithCancel(context.Background())
defer cancel()
select { select {
case z := <-outChan: case z := <-obj.startDequeuer(ctx):
t.Fatalf("Should not get anything from queue: %#v", z) t.Fatalf("Should not get anything from queue: %#v", z)
case <-time.After(time.Duration(500) * time.Millisecond): case <-time.After(time.Duration(500) * time.Millisecond):
} }
cancel()
} }
func statsHelperSet(runC, startC, waitC, runL, startL, waitL uint64) slotQueueStats { func statsHelperSet(runC, startC, waitC, runL, startL, waitL uint64) slotQueueStats {
@@ -248,7 +250,8 @@ func TestSlotQueueBasic3(t *testing.T) {
slotName := "test3" slotName := "test3"
obj := NewSlotQueue(slotName) obj := NewSlotQueue(slotName)
_, cancel1 := obj.startDequeuer() ctx, cancel := context.WithCancel(context.Background())
obj.startDequeuer(ctx)
slot1 := NewTestSlot(1) slot1 := NewTestSlot(1)
slot2 := NewTestSlot(2) slot2 := NewTestSlot(2)
@@ -257,9 +260,10 @@ func TestSlotQueueBasic3(t *testing.T) {
// now our slot must be ready in outChan, but let's cancel it // now our slot must be ready in outChan, but let's cancel it
// to cause a requeue. This should cause [1, 2] ordering to [2, 1] // to cause a requeue. This should cause [1, 2] ordering to [2, 1]
cancel1() cancel()
outChan, cancel2 := obj.startDequeuer() ctx, cancel = context.WithCancel(context.Background())
outChan := obj.startDequeuer(ctx)
// we should get '2' since cancel1() reordered the queue // we should get '2' since cancel1() reordered the queue
select { select {
@@ -303,12 +307,13 @@ func TestSlotQueueBasic3(t *testing.T) {
wg.Add(goMax) wg.Add(goMax)
for i := 0; i < goMax; i += 1 { for i := 0; i < goMax; i += 1 {
go func(id int) { go func(id int) {
ch, cancl := obj.startDequeuer()
defer cancl()
defer wg.Done() defer wg.Done()
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
select { select {
case z := <-ch: case z := <-obj.startDequeuer(ctx):
t.Fatalf("%v we shouldn't get anything from queue %#v", id, z) t.Fatalf("%v we shouldn't get anything from queue %#v", id, z)
case <-time.After(time.Duration(500) * time.Millisecond): case <-time.After(time.Duration(500) * time.Millisecond):
} }
@@ -316,7 +321,7 @@ func TestSlotQueueBasic3(t *testing.T) {
} }
// let's cancel after destroy this time // let's cancel after destroy this time
cancel2() cancel()
wg.Wait() wg.Wait()