diff --git a/api/agent/agent.go b/api/agent/agent.go index 8d99c5dde..22dbce5d3 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -319,7 +319,6 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { logger := common.Logger(ctx) logger.WithField("launcher_timeout", timeout).Info("Hot function launcher starting") isAsync := callObj.Type == models.TypeAsync - prevStats := callObj.slots.getStats() for { select { @@ -334,11 +333,10 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { } curStats := callObj.slots.getStats() - isNeeded := isNewContainerNeeded(&curStats, &prevStats) - prevStats = curStats + isNeeded := isNewContainerNeeded(&curStats) logger.WithFields(logrus.Fields{ - "currentStats": curStats, - "previousStats": curStats, + "currentStats": curStats, + "isNeeded": isNeeded, }).Debug("Hot function launcher stats") if !isNeeded { continue @@ -346,8 +344,8 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { ctxResource, cancelResource := context.WithCancel(context.Background()) logger.WithFields(logrus.Fields{ - "currentStats": curStats, - "previousStats": curStats, + "currentStats": curStats, + "isNeeded": isNeeded, }).Info("Hot function launcher starting hot container") select { @@ -673,22 +671,28 @@ func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { } done := make(chan struct{}) + start := time.Now() + call.slots.enterState(SlotQueueIdle) s := call.slots.queueSlot(&hotSlot{done, proto, errC, container, nil}) select { case <-s.trigger: + call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) case <-time.After(time.Duration(call.IdleTimeout) * time.Second): if call.slots.ejectSlot(s) { + call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) logger.Info("Canceling inactive hot function") shutdownContainer() return } case <-ctx.Done(): // container shutdown if call.slots.ejectSlot(s) { + call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) return } case <-a.shutdown: // server shutdown if call.slots.ejectSlot(s) { + call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) shutdownContainer() return } diff --git a/api/agent/slots.go b/api/agent/slots.go index c2fe1bcc7..478053cf4 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -30,17 +30,17 @@ type slotQueueMgr struct { type SlotQueueMetricType int const ( - SlotQueueRunner SlotQueueMetricType = iota - SlotQueueStarter - SlotQueueWaiter + SlotQueueRunner SlotQueueMetricType = iota // container is running + SlotQueueStarter // container is launching + SlotQueueWaiter // requests are waiting + SlotQueueIdle // hot container is running, but idle (free tokens) SlotQueueLast ) // counters per state and moving avg of time spent in each state type slotQueueStats struct { - states [SlotQueueLast]uint64 - latencyCount [SlotQueueLast]uint64 - latencies [SlotQueueLast]uint64 + states [SlotQueueLast]uint64 + latencies [SlotQueueLast]uint64 } type slotToken struct { @@ -195,59 +195,29 @@ func (a *slotQueue) getStats() slotQueueStats { return out } -func isNewContainerNeeded(cur, prev *slotQueueStats) bool { +func isNewContainerNeeded(cur *slotQueueStats) bool { - waiters := cur.states[SlotQueueWaiter] - if waiters == 0 { - return false - } - - // while a container is starting, do not start more than waiters + idlers := cur.states[SlotQueueIdle] starters := cur.states[SlotQueueStarter] - if starters >= waiters { + waiters := cur.states[SlotQueueWaiter] + + // we expect idle containers to immediately pick up + // any waiters. We assume non-idle containers busy. + effectiveWaiters := uint64(0) + if idlers < waiters { + effectiveWaiters = waiters - idlers + } + + if effectiveWaiters == 0 { return false } - // no executors? We need to spin up a container quickly - executors := starters + cur.states[SlotQueueRunner] - if executors == 0 { - return true + // if containers are starting, do not start more than effective waiters + if starters > 0 && starters >= effectiveWaiters { + return false } - // This means we are not making any progress and stats are - // not being refreshed quick enough. We err on side - // of new container here. - isEqual := true - for idx, _ := range cur.latencies { - if prev.latencies[idx] != cur.latencies[idx] { - isEqual = false - break - } - } - if isEqual { - return true - } - - // WARNING: Below is a few heuristics that are - // speculative, which may (and will) likely need - // adjustments. - - runLat := cur.latencies[SlotQueueRunner] - waitLat := cur.latencies[SlotQueueWaiter] - startLat := cur.latencies[SlotQueueStarter] - - // this determines the aggresiveness of the container launch. - if executors > 0 && runLat/executors*2 < waitLat { - return true - } - if runLat < waitLat { - return true - } - if startLat < waitLat { - return true - } - - return false + return true } func (a *slotQueue) enterState(metricIdx SlotQueueMetricType) { @@ -270,14 +240,7 @@ func (a *slotQueue) recordLatencyLocked(metricIdx SlotQueueMetricType, latency u // 0.5 is a high value to age older observations fast while filtering // some noise. For our purposes, newer observations are much more important // than older, but we still would like to low pass some noise. - // first samples are ignored. - if a.stats.latencyCount[metricIdx] != 0 { - a.stats.latencies[metricIdx] = (a.stats.latencies[metricIdx]*5 + latency*5) / 10 - } - a.stats.latencyCount[metricIdx] += 1 - if a.stats.latencyCount[metricIdx] == 0 { - a.stats.latencyCount[metricIdx] += 1 - } + a.stats.latencies[metricIdx] = (a.stats.latencies[metricIdx]*5 + latency*5) / 10 } func (a *slotQueue) recordLatency(metricIdx SlotQueueMetricType, latency uint64) { diff --git a/api/agent/slots_test.go b/api/agent/slots_test.go index d4dad66be..6b331f31d 100644 --- a/api/agent/slots_test.go +++ b/api/agent/slots_test.go @@ -174,74 +174,50 @@ func TestSlotQueueBasic2(t *testing.T) { } } -func statsHelperSet(runC, startC, waitC, runL, startL, waitL uint64) slotQueueStats { +func statsHelperSet(runC, startC, waitC, idleC uint64) slotQueueStats { return slotQueueStats{ - states: [SlotQueueLast]uint64{runC, startC, waitC}, - latencies: [SlotQueueLast]uint64{runL, startL, waitL}, + states: [SlotQueueLast]uint64{runC, startC, waitC, idleC}, } } func TestSlotNewContainerLogic1(t *testing.T) { var cur slotQueueStats - var prev slotQueueStats - cur = statsHelperSet(0, 0, 0, 0, 0, 0) - prev = statsHelperSet(0, 0, 0, 0, 0, 0) - // CASE I: There's no one waiting despite cur == prev - if isNewContainerNeeded(&cur, &prev) { - t.Fatalf("Should not need a new container cur: %#v prev: %#v", cur, prev) + cur = statsHelperSet(0, 0, 0, 0) + // CASE: There's no one waiting + if isNewContainerNeeded(&cur) { + t.Fatalf("Should not need a new container cur: %#v", cur) } - // CASE II: There are starters >= waiters - cur = statsHelperSet(0, 10, 1, 0, 0, 0) - prev = statsHelperSet(0, 10, 1, 0, 0, 0) - if isNewContainerNeeded(&cur, &prev) { - t.Fatalf("Should not need a new container cur: %#v prev: %#v", cur, prev) + // CASE: There are starters >= waiters + cur = statsHelperSet(1, 10, 10, 0) + if isNewContainerNeeded(&cur) { + t.Fatalf("Should not need a new container cur: %#v", cur) } - // CASE III: no executors - cur = statsHelperSet(0, 0, 1, 0, 0, 0) - prev = statsHelperSet(0, 0, 1, 0, 0, 0) - if !isNewContainerNeeded(&cur, &prev) { - t.Fatalf("Should need a new container cur: %#v prev: %#v", cur, prev) + // CASE: There are starters < waiters + cur = statsHelperSet(1, 5, 10, 0) + if !isNewContainerNeeded(&cur) { + t.Fatalf("Should need a new container cur: %#v", cur) } - // CASE IV: cur == prev same, progress has stalled, with waiters and - // small num of executors - cur = statsHelperSet(2, 0, 10, 0, 0, 0) - prev = statsHelperSet(2, 0, 10, 0, 0, 0) - if !isNewContainerNeeded(&cur, &prev) { - t.Fatalf("Should need a new container cur: %#v prev: %#v", cur, prev) + // CASE: effective waiters 0 (idle = waiter = 10) + cur = statsHelperSet(11, 0, 10, 10) + if isNewContainerNeeded(&cur) { + t.Fatalf("Should not need a new container cur: %#v", cur) } - // CASE V: cur != prev, runLat/executors*2 < waitLat - // Let's make cur and prev unequal to prevent blocked progress detection - cur = statsHelperSet(2, 0, 10, 12, 100, 13) - prev = statsHelperSet(2, 0, 10, 12, 101, 13) - if !isNewContainerNeeded(&cur, &prev) { - t.Fatalf("Should need a new container cur: %#v prev: %#v", cur, prev) + // CASE: effective waiters > 0 (idle = 5 waiter = 10) + cur = statsHelperSet(11, 0, 10, 5) + if !isNewContainerNeeded(&cur) { + t.Fatalf("Should need a new container cur: %#v", cur) } - // CASE VI: cur != prev, runLat < waitLat - cur = statsHelperSet(1, 0, 10, 12, 100, 14) - prev = statsHelperSet(1, 0, 10, 12, 101, 14) - if !isNewContainerNeeded(&cur, &prev) { - t.Fatalf("Should need a new container cur: %#v prev: %#v", cur, prev) - } - - // CAST VII: cur != prev, startLat < waitLat - cur = statsHelperSet(1, 0, 10, 2, 10, 20) - prev = statsHelperSet(1, 0, 10, 1, 11, 20) - if !isNewContainerNeeded(&cur, &prev) { - t.Fatalf("Should need a new container cur: %#v prev: %#v", cur, prev) - } - - // CAST VIII: cur != prev, fallback - cur = statsHelperSet(1, 0, 10, 2, 10, 2) - prev = statsHelperSet(1, 0, 10, 1, 11, 2) - if isNewContainerNeeded(&cur, &prev) { - t.Fatalf("Should not need a new container cur: %#v prev: %#v", cur, prev) + // CASE: no executors, but 1 waiter + cur = statsHelperSet(0, 0, 1, 0) + if !isNewContainerNeeded(&cur) { + t.Fatalf("Should need a new container cur: %#v", cur) } }