diff --git a/api/agent/agent.go b/api/agent/agent.go index 9d3adea03..3e82c8546 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -306,6 +306,7 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { logger := common.Logger(ctx) logger.WithField("launcher_timeout", timeout).Info("Hot function launcher starting") isAsync := callObj.Type == models.TypeAsync + prevStats := callObj.slots.getStats() for { select { @@ -319,14 +320,22 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { case <-callObj.slots.signaller: } - isNeeded, stats := callObj.slots.isNewContainerNeeded() - logger.WithField("stats", stats).Debug("Hot function launcher stats") + curStats := callObj.slots.getStats() + isNeeded := isNewContainerNeeded(&curStats, &prevStats) + prevStats = curStats + logger.WithFields(logrus.Fields{ + "currentStats": curStats, + "previousStats": curStats, + }).Debug("Hot function launcher stats") if !isNeeded { continue } resourceCtx, cancel := context.WithCancel(context.Background()) - logger.WithField("stats", stats).Info("Hot function launcher starting hot container") + logger.WithFields(logrus.Fields{ + "currentStats": curStats, + "previousStats": curStats, + }).Info("Hot function launcher starting hot container") select { case tok, isOpen := <-a.resources.GetResourceToken(resourceCtx, callObj.Memory, isAsync): diff --git a/api/agent/slots.go b/api/agent/slots.go index 511d666b9..38d96ba44 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -189,60 +189,59 @@ func (a *slotQueue) getStats() slotQueueStats { return out } -func (a *slotQueue) isNewContainerNeeded() (bool, slotQueueStats) { +func isNewContainerNeeded(cur, prev *slotQueueStats) bool { - stats := a.getStats() - - waiters := stats.states[SlotQueueWaiter] + waiters := cur.states[SlotQueueWaiter] if waiters == 0 { - return false, stats + return false } // while a container is starting, do not start more than waiters - starters := stats.states[SlotQueueStarter] + starters := cur.states[SlotQueueStarter] if starters >= waiters { - return false, stats + return false } - // this is a bit aggresive and assumes that we only - // want to queue as much as num of containers. - executors := starters + stats.states[SlotQueueRunner] - if executors < waiters { - return true, stats + // no executors? We need to spin up a container quickly + executors := starters + cur.states[SlotQueueRunner] + if executors == 0 { + return true + } + + // This means we are not making any progress and stats are + // not being refreshed quick enough. We err on side + // of new container here. + isEqual := true + for idx, _ := range cur.latencies { + if prev.latencies[idx] != cur.latencies[idx] { + isEqual = false + break + } + } + if isEqual { + return true } // WARNING: Below is a few heuristics that are // speculative, which may (and will) likely need // adjustments. - // WARNING: latencies below are updated after a call - // switches to/from different states. Do not assume - // the metrics below are always up-to-date. For example, - // a sudden burst of incoming requests will increase - // waiter count but not necessarily wait latency until - // those requests switch from waiter state. - - runLat := stats.latencies[SlotQueueRunner] - waitLat := stats.latencies[SlotQueueWaiter] - startLat := stats.latencies[SlotQueueStarter] - - // no wait latency? No need to spin up new container - if waitLat == 0 { - return false, stats - } + runLat := cur.latencies[SlotQueueRunner] + waitLat := cur.latencies[SlotQueueWaiter] + startLat := cur.latencies[SlotQueueStarter] // this determines the aggresiveness of the container launch. - if runLat/executors*2 < waitLat { - return true, stats + if executors > 0 && runLat/executors*2 < waitLat { + return true } if runLat < waitLat { - return true, stats + return true } if startLat < waitLat { - return true, stats + return true } - return false, stats + return false } func (a *slotQueue) enterState(metricIdx SlotQueueMetricType) { diff --git a/api/agent/slots_test.go b/api/agent/slots_test.go index 7939552bf..131578798 100644 --- a/api/agent/slots_test.go +++ b/api/agent/slots_test.go @@ -152,18 +152,6 @@ func TestSlotQueueBasic1(t *testing.T) { } case <-time.After(time.Duration(500) * time.Millisecond): } - - stats1 := obj.getStats() - isNeeded, stats2 := obj.isNewContainerNeeded() - - if stats1 != stats2 { - t.Fatalf("Faulty stats %#v != %#v", stats1, stats2) - } - - // there are no waiters. - if isNeeded { - t.Fatalf("Shouldn't need a container") - } } func TestSlotQueueBasic2(t *testing.T) { @@ -173,9 +161,6 @@ func TestSlotQueueBasic2(t *testing.T) { if !obj.isIdle() { t.Fatalf("Should be idle") } - if ok, _ := obj.isNewContainerNeeded(); ok { - t.Fatalf("Should not need a new container") - } outChan, cancel := obj.startDequeuer(context.Background()) select { @@ -187,6 +172,27 @@ func TestSlotQueueBasic2(t *testing.T) { cancel() } +func TestSlotNewContainerLogic1(t *testing.T) { + + cur := slotQueueStats{} + cur.states[SlotQueueRunner] = 10 + cur.states[SlotQueueWaiter] = 1 + + prev := cur + + if !isNewContainerNeeded(&cur, &prev) { + t.Fatalf("Should need a new container cur: %#v prev: %#v", cur, prev) + } + + prev.latencies[SlotQueueRunner] = 1 + prev.latencies[SlotQueueWaiter] = 1 + prev.latencies[SlotQueueStarter] = 1 + + if isNewContainerNeeded(&cur, &prev) { + t.Fatalf("Should not need a new container cur: %#v prev: %#v", cur, prev) + } +} + func TestSlotQueueBasic3(t *testing.T) { slotName := "test3"