diff --git a/api/agent/agent.go b/api/agent/agent.go index d50a66643..55009c518 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -173,7 +173,7 @@ func (a *agent) Submit(callI Call) error { default: } - a.stats.Enqueue() + a.stats.Enqueue(callI.Model().Path) call := callI.(*call) ctx := call.req.Context() @@ -188,6 +188,7 @@ func (a *agent) Submit(callI Call) error { slot, err := a.getSlot(ctx, call) // find ram available / running if err != nil { + a.stats.Dequeue(callI.Model().Path) return err } // TODO if the call times out & container is created, we need @@ -197,16 +198,17 @@ func (a *agent) Submit(callI Call) error { // TODO Start is checking the timer now, we could do it here, too. err = call.Start(ctx) if err != nil { + a.stats.Dequeue(callI.Model().Path) return err } - a.stats.Start() + a.stats.Start(callI.Model().Path) err = slot.exec(ctx, call) // pass this error (nil or otherwise) to end directly, to store status, etc // End may rewrite the error or elect to return it - a.stats.Complete() + a.stats.Complete(callI.Model().Path) // TODO: we need to allocate more time to store the call + logs in case the call timed out, // but this could put us over the timeout if the call did not reply yet (need better policy). diff --git a/api/agent/stats.go b/api/agent/stats.go index 3dc4f08ac..c8a70866a 100644 --- a/api/agent/stats.go +++ b/api/agent/stats.go @@ -6,36 +6,83 @@ import "sync" // * hot containers active // * memory used / available +// global statistics type stats struct { - mu sync.Mutex + mu sync.Mutex + // statistics for all functions combined + queue uint64 + running uint64 + complete uint64 + // statistics for individual functions, keyed by function path + functionStatsMap map[string]*functionStats +} + +// statistics for an individual function +type functionStats struct { queue uint64 running uint64 complete uint64 } type Stats struct { + // statistics for all functions combined + Queue uint64 + Running uint64 + Complete uint64 + // statistics for individual functions, keyed by function path + FunctionStatsMap map[string]*FunctionStats +} + +// statistics for an individual function +type FunctionStats struct { Queue uint64 Running uint64 Complete uint64 } -func (s *stats) Enqueue() { +func (s *stats) getStatsForFunction(path string) *functionStats { + if s.functionStatsMap == nil { + s.functionStatsMap = make(map[string]*functionStats) + } + thisFunctionStats, found := s.functionStatsMap[path] + if !found { + thisFunctionStats = &functionStats{} + s.functionStatsMap[path] = thisFunctionStats + } + + return thisFunctionStats +} + +func (s *stats) Enqueue(path string) { s.mu.Lock() s.queue++ + s.getStatsForFunction(path).queue++ s.mu.Unlock() } -func (s *stats) Start() { +// Call when a function has been queued but cannot be started because of an error +func (s *stats) Dequeue(path string) { s.mu.Lock() s.queue-- - s.running++ + s.getStatsForFunction(path).queue-- s.mu.Unlock() } -func (s *stats) Complete() { +func (s *stats) Start(path string) { + s.mu.Lock() + s.queue-- + s.getStatsForFunction(path).queue-- + s.running++ + s.getStatsForFunction(path).running++ + s.mu.Unlock() +} + +func (s *stats) Complete(path string) { s.mu.Lock() s.running-- + s.getStatsForFunction(path).running-- s.complete++ + s.getStatsForFunction(path).complete++ s.mu.Unlock() } @@ -45,6 +92,11 @@ func (s *stats) Stats() Stats { stats.Running = s.running stats.Complete = s.complete stats.Queue = s.queue + stats.FunctionStatsMap = make(map[string]*FunctionStats) + for key, value := range s.functionStatsMap { + thisFunctionStats := &FunctionStats{Queue: value.queue, Running: value.running, Complete: value.complete} + stats.FunctionStatsMap[key] = thisFunctionStats + } s.mu.Unlock() return stats }