diff --git a/api/agent/agent.go b/api/agent/agent.go index 13e70827e..f588fb7ef 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -2,7 +2,6 @@ package agent import ( "context" - "errors" "io" "net/http" "sync" @@ -188,7 +187,7 @@ func (a *agent) Submit(callI Call) error { select { case <-a.shutdown: - return errors.New("agent shut down") + return models.ErrCallTimeoutServerBusy default: } @@ -266,74 +265,103 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { isHot := protocol.IsStreamable(protocol.Protocol(call.Format)) if isHot { - // For hot requests, we use a long lived slot queue, which we use to manage hot containers - call.slots = a.slotMgr.getHotSlotQueue(call) start := time.Now() - call.slots.enterState(SlotQueueWaiter) - s, err := a.launchHot(ctx, call) - call.slots.exitStateWithLatency(SlotQueueWaiter, uint64(time.Now().Sub(start).Seconds()*1000)) + // For hot requests, we use a long lived slot queue, which we use to manage hot containers + var isNew bool + call.slots, isNew = a.slotMgr.getSlotQueue(call) + if isNew { + go a.hotLauncher(ctx, call) + } + s, err := a.waitHot(ctx, call) + call.slots.exitStateWithLatency(SlotQueueWaiter, uint64(time.Now().Sub(start).Seconds()*1000)) return s, err } return a.launchCold(ctx, call) } -// launchHot checks with slot queue to see if a new container needs to be launched and waits -// for available slots in the queue for hot request execution. -func (a *agent) launchHot(ctx context.Context, call *call) (Slot, error) { +// hotLauncher is spawned in a go routine for each slot queue to monitor stats and launch hot +// containers if needed. Upon shutdown or activity timeout, hotLauncher exits and during exit, +// it destroys the slot queue. +func (a *agent) hotLauncher(ctx context.Context, callObj *call) { - isAsync := call.Type == models.TypeAsync + // Let use 60 minutes or 2 * IdleTimeout as hot queue idle timeout, pick + // whichever is longer. If in this time, there's no activity, then + // we destroy the hot queue. + timeout := time.Duration(60) * time.Minute + idleTimeout := time.Duration(callObj.IdleTimeout) * time.Second * 2 + if timeout < idleTimeout { + timeout = idleTimeout + } + + logger := common.Logger(ctx) + logger.WithField("launcher_timeout", timeout).Info("Hot function launcher starting") + isAsync := callObj.Type == models.TypeAsync -launchLoop: for { - // Check/evaluate if we need to launch a new hot container - doLaunch, stats := call.slots.isNewContainerNeeded() - common.Logger(ctx).WithField("stats", stats).Debug("checking hot container launch ", doLaunch) - - if doLaunch { - ctxToken, tokenCancel := context.WithCancel(context.Background()) - - // wait on token/slot/timeout whichever comes first - select { - case tok, isOpen := <-a.resources.GetResourceToken(ctxToken, call.Memory, isAsync): - tokenCancel() - if !isOpen { - return nil, models.ErrCallTimeoutServerBusy - } - - a.wg.Add(1) - go a.runHot(ctx, call, tok) - case s, ok := <-call.slots.getDequeueChan(): - tokenCancel() - if !ok { - return nil, errors.New("slot shut down while waiting for hot slot") - } - if s.acquireSlot() { - if s.slot.Error() != nil { - s.slot.Close() - return nil, s.slot.Error() - } - return s.slot, nil - } - - // we failed to take ownership of the token (eg. container idle timeout) - // try launching again - continue launchLoop - case <-ctx.Done(): - tokenCancel() - return nil, ctx.Err() + select { + case <-a.shutdown: // server shutdown + return + case <-time.After(timeout): + if a.slotMgr.deleteSlotQueue(callObj.slots) { + logger.Info("Hot function launcher timed out") + return } + case <-callObj.slots.signaller: } - // After launching (if it was necessary) a container, now wait for slot/timeout - // or periodically reevaluate the launchHot() logic from beginning. + isNeeded, stats := callObj.slots.isNewContainerNeeded() + logger.WithField("stats", stats).Debug("Hot function launcher stats") + if !isNeeded { + continue + } + + resourceCtx, cancel := context.WithCancel(context.Background()) + logger.WithField("stats", stats).Info("Hot function launcher starting hot container") + select { - case s, ok := <-call.slots.getDequeueChan(): - if !ok { - return nil, errors.New("slot shut down while waiting for hot slot") + case tok, isOpen := <-a.resources.GetResourceToken(resourceCtx, callObj.Memory, isAsync): + cancel() + if isOpen { + a.wg.Add(1) + go func(ctx context.Context, call *call, tok ResourceToken) { + a.runHot(ctx, call, tok) + a.wg.Done() + }(ctx, callObj, tok) + } else { + // this means the resource was impossible to reserve (eg. memory size we can never satisfy) + callObj.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: models.ErrCallTimeoutServerBusy}) } + case <-time.After(timeout): + cancel() + if a.slotMgr.deleteSlotQueue(callObj.slots) { + logger.Info("Hot function launcher timed out") + return + } + case <-a.shutdown: // server shutdown + cancel() + return + } + } +} + +// waitHot pings and waits for a hot container from the slot queue +func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { + + ch, cancel := call.slots.startDequeuer(ctx) + defer cancel() + + for { + // send a notification to launcHot() + select { + case call.slots.signaller <- true: + default: + } + + select { + case s := <-ch: if s.acquireSlot() { if s.slot.Error() != nil { s.slot.Close() @@ -341,13 +369,13 @@ launchLoop: } return s.slot, nil } - - // we failed to take ownership of the token (eg. container idle timeout) - // try launching again + // we failed to take ownership of the token (eg. container idle timeout) => try again case <-ctx.Done(): return nil, ctx.Err() case <-time.After(time.Duration(200) * time.Millisecond): - // reevaluate + // ping dequeuer again + case <-a.shutdown: // server shutdown + return nil, models.ErrCallTimeoutServerBusy } } } @@ -506,7 +534,6 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { // We must be careful to only use ctxArg for logs/spans - defer a.wg.Done() // create a span from ctxArg but ignore the new Context // instead we will create a new Context below and explicitly set its span diff --git a/api/agent/slots.go b/api/agent/slots.go index cae779988..c37d87b11 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -23,7 +23,7 @@ type Slot interface { // slotQueueMgr manages hot container slotQueues type slotQueueMgr struct { - hMu sync.RWMutex // protects hot + hMu sync.Mutex // protects hot hot map[string]*slotQueue } @@ -57,8 +57,7 @@ type slotQueue struct { cond *sync.Cond slots []*slotToken nextId uint64 - output chan *slotToken - isClosed bool + signaller chan bool statsLock sync.Mutex // protects stats below stats slotQueueStats } @@ -72,49 +71,12 @@ func NewSlotQueueMgr() *slotQueueMgr { func NewSlotQueue(key string) *slotQueue { obj := &slotQueue{ - key: key, - cond: sync.NewCond(new(sync.Mutex)), - slots: make([]*slotToken, 0), - output: make(chan *slotToken), + key: key, + cond: sync.NewCond(new(sync.Mutex)), + slots: make([]*slotToken, 0), + signaller: make(chan bool, 1), } - // producer go routine to pick LIFO slots and - // push them into output channel - go func() { - for { - obj.cond.L.Lock() - for len(obj.slots) <= 0 && !obj.isClosed { - obj.cond.Wait() - } - - // cleanup and exit - if obj.isClosed { - - purge := obj.slots - obj.slots = obj.slots[:0] - obj.cond.L.Unlock() - - close(obj.output) - - for _, val := range purge { - if val.acquireSlot() { - val.slot.Close() - } - } - - return - } - - // pop - item := obj.slots[len(obj.slots)-1] - obj.slots = obj.slots[:len(obj.slots)-1] - obj.cond.L.Unlock() - - // block - obj.output <- item - } - }() - return obj } @@ -135,19 +97,13 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool { return false } - isFound := false - a.cond.L.Lock() - for idx, val := range a.slots { - if val.id == s.id { - a.slots[0], a.slots[idx] = a.slots[idx], a.slots[0] - isFound = true + for i := 0; i < len(a.slots); i++ { + if a.slots[i].id == s.id { + a.slots = append(a.slots[:i], a.slots[i+1:]...) break } } - if isFound { - a.slots = a.slots[1:] - } a.cond.L.Unlock() s.slot.Close() @@ -156,44 +112,73 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool { return true } -func (a *slotQueue) destroySlotQueue() { - doSignal := false - a.cond.L.Lock() - if !a.isClosed { - a.isClosed = true - doSignal = true - } - a.cond.L.Unlock() - if doSignal { - a.cond.Signal() - } -} +func (a *slotQueue) startDequeuer(ctx context.Context) (chan *slotToken, context.CancelFunc) { -func (a *slotQueue) getDequeueChan() chan *slotToken { - return a.output + ctx, cancel := context.WithCancel(ctx) + + myCancel := func() { + cancel() + a.cond.Broadcast() + } + + output := make(chan *slotToken) + + go func() { + for { + a.cond.L.Lock() + for len(a.slots) <= 0 && (ctx.Err() == nil) { + a.cond.Wait() + } + + if ctx.Err() != nil { + a.cond.L.Unlock() + return + } + + // pop + item := a.slots[len(a.slots)-1] + a.slots = a.slots[:len(a.slots)-1] + a.cond.L.Unlock() + + select { + case output <- item: // good case (dequeued) + case <-item.trigger: // ejected (eject handles cleanup) + case <-ctx.Done(): // time out or cancel from caller + // consume slot, we let the hot container queue the slot again + if item.acquireSlot() { + item.slot.Close() + } + } + } + }() + + return output, myCancel } func (a *slotQueue) queueSlot(slot Slot) *slotToken { token := &slotToken{slot, make(chan struct{}), 0, 0} - isClosed := false a.cond.L.Lock() - if !a.isClosed { - token.id = a.nextId - a.slots = append(a.slots, token) - a.nextId += 1 - } else { - isClosed = true - } + token.id = a.nextId + a.slots = append(a.slots, token) + a.nextId += 1 a.cond.L.Unlock() - if !isClosed { - a.cond.Signal() - return token - } + a.cond.Broadcast() + return token +} - return nil +// isIdle() returns true is there's no activity for this slot queue. This +// means no one is waiting, running or starting. +func (a *slotQueue) isIdle() bool { + var partySize uint64 + + a.statsLock.Lock() + partySize = a.stats.states[SlotQueueWaiter] + a.stats.states[SlotQueueStarter] + a.stats.states[SlotQueueRunner] + a.statsLock.Unlock() + + return partySize == 0 } func (a *slotQueue) getStats() slotQueueStats { @@ -296,32 +281,35 @@ func (a *slotQueue) exitStateWithLatency(metricIdx SlotQueueMetricType, latency // getSlot must ensure that if it receives a slot, it will be returned, otherwise // a container will be locked up forever waiting for slot to free. -func (a *slotQueueMgr) getHotSlotQueue(call *call) *slotQueue { +func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) { key := getSlotQueueKey(call) - a.hMu.RLock() + a.hMu.Lock() slots, ok := a.hot[key] - a.hMu.RUnlock() if !ok { - a.hMu.Lock() - slots, ok = a.hot[key] - if !ok { - slots = NewSlotQueue(key) - a.hot[key] = slots - } - a.hMu.Unlock() + slots = NewSlotQueue(key) + a.hot[key] = slots } - return slots + slots.enterState(SlotQueueWaiter) + a.hMu.Unlock() + + return slots, !ok } // currently unused. But at some point, we need to age/delete old // slotQueues. -func (a *slotQueueMgr) destroySlotQueue(slots *slotQueue) { - slots.destroySlotQueue() +func (a *slotQueueMgr) deleteSlotQueue(slots *slotQueue) bool { + isDeleted := false + a.hMu.Lock() - delete(a.hot, slots.key) + if slots.isIdle() { + delete(a.hot, slots.key) + isDeleted = true + } a.hMu.Unlock() + + return isDeleted } func getSlotQueueKey(call *call) string { diff --git a/api/agent/slots_test.go b/api/agent/slots_test.go new file mode 100644 index 000000000..7939552bf --- /dev/null +++ b/api/agent/slots_test.go @@ -0,0 +1,280 @@ +package agent + +import ( + "context" + "fmt" + "sync" + "testing" + "time" +) + +type testSlot struct { + id uint64 + err error + isClosed bool +} + +func (a *testSlot) exec(ctx context.Context, call *call) error { + return nil +} + +func (a *testSlot) Close() error { + if a.isClosed { + panic(fmt.Errorf("id=%d already closed %v", a.id, a)) + } + a.isClosed = true + return nil +} + +func (a *testSlot) Error() error { + return a.err +} + +func NewTestSlot(id uint64) Slot { + mySlot := &testSlot{ + id: id, + } + return mySlot +} + +func TestSlotQueueBasic1(t *testing.T) { + + maxId := uint64(10) + slotName := "test1" + + slots := make([]Slot, 0, maxId) + tokens := make([]*slotToken, 0, maxId) + + obj := NewSlotQueue(slotName) + + outChan, cancel := obj.startDequeuer(context.Background()) + select { + case z := <-outChan: + t.Fatalf("Should not get anything from queue: %#v", z) + case <-time.After(time.Duration(500) * time.Millisecond): + } + cancel() + + // create slots + for id := uint64(0); id < maxId; id += 1 { + slots = append(slots, NewTestSlot(id)) + } + + // queue a few slots here + for id := uint64(0); id < maxId; id += 1 { + tok := obj.queueSlot(slots[id]) + + innerTok := tok.slot.(*testSlot) + + // check for slot id match + if innerTok != slots[id] { + t.Fatalf("queued testSlot does not match with slotToken.slot %#v vs %#v", innerTok, slots[id]) + } + + tokens = append(tokens, tok) + } + + // Now according to LIFO semantics, we should get 9,8,7,6,5,4,3,2,1,0 if we dequeued right now. + // but let's eject 9 + if !obj.ejectSlot(tokens[9]) { + t.Fatalf("Cannot eject slotToken: %#v", tokens[9]) + } + // let eject 0 + if !obj.ejectSlot(tokens[0]) { + t.Fatalf("Cannot eject slotToken: %#v", tokens[0]) + } + // let eject 5 + if !obj.ejectSlot(tokens[5]) { + t.Fatalf("Cannot eject slotToken: %#v", tokens[5]) + } + // try ejecting 5 again, it should fail + if obj.ejectSlot(tokens[5]) { + t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5]) + } + + outChan, cancel = obj.startDequeuer(context.Background()) + + // now we should get 8 + select { + case z := <-outChan: + if z.id != 8 { + t.Fatalf("Bad slotToken received: %#v", z) + } + + if !z.acquireSlot() { + t.Fatalf("Cannot acquire slotToken received: %#v", z) + } + + // second acquire shoudl fail + if z.acquireSlot() { + t.Fatalf("Should not be able to acquire twice slotToken: %#v", z) + } + + z.slot.Close() + + case <-time.After(time.Duration(1) * time.Second): + t.Fatal("timeout in waiting slotToken") + } + + // now we should get 7 + select { + case z := <-outChan: + if z.id != 7 { + t.Fatalf("Bad slotToken received: %#v", z) + } + + // eject it before we can consume + if !obj.ejectSlot(tokens[7]) { + t.Fatalf("Cannot eject slotToken: %#v", tokens[2]) + } + + // we shouldn't be able to consume an ejected slotToken + if z.acquireSlot() { + t.Fatalf("We should not be able to acquire slotToken received: %#v", z) + } + + case <-time.After(time.Duration(1) * time.Second): + t.Fatal("timeout in waiting slotToken") + } + + cancel() + + // we should get nothing or 6 + select { + case z, ok := <-outChan: + if ok { + if z.id != 6 { + t.Fatalf("Should not get anything except for 6 from queue: %#v", z) + } + if !z.acquireSlot() { + t.Fatalf("cannot acquire token: %#v", z) + } + } + case <-time.After(time.Duration(500) * time.Millisecond): + } + + stats1 := obj.getStats() + isNeeded, stats2 := obj.isNewContainerNeeded() + + if stats1 != stats2 { + t.Fatalf("Faulty stats %#v != %#v", stats1, stats2) + } + + // there are no waiters. + if isNeeded { + t.Fatalf("Shouldn't need a container") + } +} + +func TestSlotQueueBasic2(t *testing.T) { + + obj := NewSlotQueue("test2") + + if !obj.isIdle() { + t.Fatalf("Should be idle") + } + if ok, _ := obj.isNewContainerNeeded(); ok { + t.Fatalf("Should not need a new container") + } + + outChan, cancel := obj.startDequeuer(context.Background()) + select { + case z := <-outChan: + t.Fatalf("Should not get anything from queue: %#v", z) + case <-time.After(time.Duration(500) * time.Millisecond): + } + + cancel() +} + +func TestSlotQueueBasic3(t *testing.T) { + + slotName := "test3" + + obj := NewSlotQueue(slotName) + _, cancel1 := obj.startDequeuer(context.Background()) + + slot1 := NewTestSlot(1) + slot2 := NewTestSlot(2) + token1 := obj.queueSlot(slot1) + obj.queueSlot(slot2) + + // now our slot must be ready in outChan, but let's cancel it + // to cause a requeue. This should cause [1, 2] ordering to [2, 1] + cancel1() + + outChan, cancel2 := obj.startDequeuer(context.Background()) + + // we should get '2' since cancel1() reordered the queue + select { + case item, ok := <-outChan: + if !ok { + t.Fatalf("outChan should be open") + } + + inner := item.slot.(*testSlot) + outer := slot2.(*testSlot) + + if inner.id != outer.id { + t.Fatalf("item should be 2") + } + if inner.isClosed { + t.Fatalf("2 should not yet be closed") + } + + if !item.acquireSlot() { + t.Fatalf("2 acquire should not fail") + } + + item.slot.Close() + + case <-time.After(time.Duration(1) * time.Second): + t.Fatal("timeout in waiting slotToken") + } + + // let's eject 1 + if !obj.ejectSlot(token1) { + t.Fatalf("failed to eject 1") + } + if !slot1.(*testSlot).isClosed { + t.Fatalf("1 should be closed") + } + + // spin up bunch of go routines, where each should get a non-acquirable + // token or timeout due the imminent obj.destroySlotQueue() + var wg sync.WaitGroup + goMax := 10 + wg.Add(goMax) + for i := 0; i < goMax; i += 1 { + go func(id int) { + ch, cancl := obj.startDequeuer(context.Background()) + defer cancl() + defer wg.Done() + + select { + case z := <-ch: + t.Fatalf("%v we shouldn't get anything from queue %#v", id, z) + case <-time.After(time.Duration(500) * time.Millisecond): + } + }(i) + } + + // let's cancel after destroy this time + cancel2() + + wg.Wait() + + select { + case z := <-outChan: + t.Fatalf("Should not get anything from queue: %#v", z) + case <-time.After(time.Duration(500) * time.Millisecond): + } + + // both should be closed + if !slot1.(*testSlot).isClosed { + t.Fatalf("item1 should be closed") + } + if !slot2.(*testSlot).isClosed { + t.Fatalf("item2 should be closed") + } +}