From 97d78c584b33d8238281d65898c5de905a49f663 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 26 Jan 2018 12:21:11 -0800 Subject: [PATCH] fn: better slot/container/request state tracking (#719) * fn: better slot/container/request state tracking --- api/agent/agent.go | 86 +++++++++++++------- api/agent/call.go | 18 +++-- api/agent/slots.go | 107 ++++++++++++------------- api/agent/slots_test.go | 29 +++---- api/agent/state_trackers.go | 152 ++++++++++++++++++++++++++++++++++++ api/agent/stats.go | 65 ++++++++------- api/common/metrics.go | 10 +++ 7 files changed, 331 insertions(+), 136 deletions(-) create mode 100644 api/agent/state_trackers.go diff --git a/api/agent/agent.go b/api/agent/agent.go index fde19a6b5..87d69bc2a 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -180,9 +180,32 @@ func (a *agent) Submit(callI Call) error { return err } +func (a *agent) startStateTrackers(ctx context.Context, call *call) { + + if !protocol.IsStreamable(protocol.Protocol(call.Format)) { + // For cold containers, we track the container state in call + call.containerState = NewContainerState() + } + + call.requestState = NewRequestState() +} + +func (a *agent) endStateTrackers(ctx context.Context, call *call) { + + call.requestState.UpdateState(ctx, RequestStateDone, call.slots) + + // For cold containers, we are done with the container. + if call.containerState != nil { + call.containerState.UpdateState(ctx, ContainerStateDone, call.slots) + } +} + func (a *agent) submit(ctx context.Context, call *call) error { a.stats.Enqueue(ctx, call.AppName, call.Path) + a.startStateTrackers(ctx, call) + defer a.endStateTrackers(ctx, call) + slot, err := a.getSlot(ctx, call) if err != nil { a.handleStatsDequeue(ctx, call, err) @@ -226,10 +249,10 @@ func transformTimeout(e error, isRetriable bool) error { func (a *agent) handleStatsDequeue(ctx context.Context, call *call, err error) { if err == context.DeadlineExceeded { a.stats.Dequeue(ctx, call.AppName, call.Path) - a.stats.IncrementTooBusy(ctx) + IncrementTooBusy(ctx) } else { a.stats.DequeueAndFail(ctx, call.AppName, call.Path) - a.stats.IncrementErrors(ctx) + IncrementErrors(ctx) } } @@ -243,9 +266,9 @@ func (a *agent) handleStatsEnd(ctx context.Context, call *call, err error) { a.stats.Failed(ctx, call.AppName, call.Path) // increment the timeout or errors count, as appropriate if err == context.DeadlineExceeded { - a.stats.IncrementTimedout(ctx) + IncrementTimedout(ctx) } else { - a.stats.IncrementErrors(ctx) + IncrementErrors(ctx) } } } @@ -282,22 +305,19 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_get_slot") defer span.Finish() - isHot := protocol.IsStreamable(protocol.Protocol(call.Format)) - if isHot { - start := time.Now() - + if protocol.IsStreamable(protocol.Protocol(call.Format)) { // For hot requests, we use a long lived slot queue, which we use to manage hot containers var isNew bool call.slots, isNew = a.slotMgr.getSlotQueue(call) + call.requestState.UpdateState(ctx, RequestStateWait, call.slots) if isNew { go a.hotLauncher(ctx, call) } - s, err := a.waitHot(ctx, call) - call.slots.exitStateWithLatency(SlotQueueWaiter, uint64(time.Now().Sub(start).Seconds()*1000)) return s, err } + call.requestState.UpdateState(ctx, RequestStateWait, call.slots) return a.launchCold(ctx, call) } @@ -351,18 +371,24 @@ func (a *agent) checkLaunch(ctx context.Context, call *call) { if !isNeeded { return } - common.Logger(ctx).WithFields(logrus.Fields{"currentStats": curStats, "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") + + state := NewContainerState() + state.UpdateState(ctx, ContainerStateWait, call.slots) + + common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") select { case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): a.wg.Add(1) // add waiter in this thread go func() { // NOTE: runHot will not inherit the timeout from ctx (ignore timings) - a.runHot(ctx, call, tok) + a.runHot(ctx, call, tok, state) a.wg.Done() }() case <-ctx.Done(): // timeout + state.UpdateState(ctx, ContainerStateDone, call.slots) case <-a.shutdown: // server shutdown + state.UpdateState(ctx, ContainerStateDone, call.slots) } } @@ -418,6 +444,8 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_launch_cold") defer span.Finish() + call.containerState.UpdateState(ctx, ContainerStateWait, call.slots) + select { case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): go a.prepCold(ctx, call, tok, ch) @@ -453,6 +481,9 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_cold_exec") defer span.Finish() + call.requestState.UpdateState(ctx, RequestStateExec, call.slots) + call.containerState.UpdateState(ctx, ContainerStateBusy, call.slots) + waiter, err := s.cookie.Run(ctx) if err != nil { return err @@ -504,12 +535,11 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error { span, ctx := opentracing.StartSpanFromContext(ctx, "agent_hot_exec") defer span.Finish() + call.requestState.UpdateState(ctx, RequestStateExec, call.slots) + // link the container id and id in the logs [for us!] common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call") - start := time.Now() - defer func() { call.slots.recordLatency(SlotQueueRunner, uint64(time.Now().Sub(start).Seconds()*1000)) }() - // swap in the new stderr logger & stat accumulator oldStderr := s.container.swap(call.stderr, &call.Stats) defer s.container.swap(oldStderr, nil) // once we're done, swap out in this scope to prevent races @@ -542,6 +572,8 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch span, ctx := opentracing.StartSpanFromContext(ctx, "agent_prep_cold") defer span.Finish() + call.containerState.UpdateState(ctx, ContainerStateStart, call.slots) + // add additional headers to the config to shove everything into env vars for cold for k, v := range call.Headers { if !specialHeader(k) { @@ -568,6 +600,9 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch // pull & create container before we return a slot, so as to be friendly // about timing out if this takes a while... cookie, err := a.driver.Prepare(ctx, container) + + call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots) + slot := &coldSlot{cookie, tok, err} select { case ch <- slot: @@ -576,7 +611,7 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch } } -func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) { +func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state ContainerState) { // IMPORTANT: get a context that has a child span / logger but NO timeout // TODO this is a 'FollowsFrom' ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx)) @@ -590,8 +625,8 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) { proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead) - start := time.Now() - call.slots.enterState(SlotQueueStarter) + state.UpdateState(ctx, ContainerStateStart, call.slots) + defer state.UpdateState(ctx, ContainerStateDone, call.slots) cid := id.New().String() @@ -617,7 +652,6 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) { cookie, err := a.driver.Prepare(ctx, container) if err != nil { - call.slots.exitStateWithLatency(SlotQueueStarter, uint64(time.Now().Sub(start).Seconds()*1000)) call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err}) return } @@ -625,15 +659,12 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) { waiter, err := cookie.Run(ctx) if err != nil { - call.slots.exitStateWithLatency(SlotQueueStarter, uint64(time.Now().Sub(start).Seconds()*1000)) call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err}) return } // container is running - call.slots.enterState(SlotQueueRunner) - call.slots.exitStateWithLatency(SlotQueueStarter, uint64(time.Now().Sub(start).Seconds()*1000)) - defer call.slots.exitState(SlotQueueRunner) + state.UpdateState(ctx, ContainerStateIdle, call.slots) // buffered, in case someone has slot when waiter returns but isn't yet listening errC := make(chan error, 1) @@ -653,30 +684,27 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) { } done := make(chan struct{}) - start := time.Now() - call.slots.enterState(SlotQueueIdle) + state.UpdateState(ctx, ContainerStateIdle, call.slots) s := call.slots.queueSlot(&hotSlot{done, proto, errC, container, nil}) select { case <-s.trigger: - call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) case <-time.After(time.Duration(call.IdleTimeout) * time.Second): if call.slots.ejectSlot(s) { - call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) logger.Info("Canceling inactive hot function") return } case <-ctx.Done(): // container shutdown if call.slots.ejectSlot(s) { - call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) return } case <-a.shutdown: // server shutdown if call.slots.ejectSlot(s) { - call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000)) return } } + + state.UpdateState(ctx, ContainerStateBusy, call.slots) // IMPORTANT: if we fail to eject the slot, it means that a consumer // just dequeued this and acquired the slot. In other words, we were // late in ejectSlots(), so we have to execute this request in this diff --git a/api/agent/call.go b/api/agent/call.go index 5bbbe43cd..480e7ebcf 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -249,14 +249,16 @@ func (a *agent) GetCall(opts ...CallOpt) (Call, error) { type call struct { *models.Call - da DataAccess - w io.Writer - req *http.Request - stderr io.ReadWriteCloser - ct callTrigger - slots *slotQueue - slotDeadline time.Time - execDeadline time.Time + da DataAccess + w io.Writer + req *http.Request + stderr io.ReadWriteCloser + ct callTrigger + slots *slotQueue + slotDeadline time.Time + execDeadline time.Time + requestState RequestState + containerState ContainerState } func (c *call) Model() *models.Call { return c.Call } diff --git a/api/agent/slots.go b/api/agent/slots.go index 478053cf4..a71f0e820 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -27,20 +27,10 @@ type slotQueueMgr struct { hot map[string]*slotQueue } -type SlotQueueMetricType int - -const ( - SlotQueueRunner SlotQueueMetricType = iota // container is running - SlotQueueStarter // container is launching - SlotQueueWaiter // requests are waiting - SlotQueueIdle // hot container is running, but idle (free tokens) - SlotQueueLast -) - -// counters per state and moving avg of time spent in each state +// request and container states type slotQueueStats struct { - states [SlotQueueLast]uint64 - latencies [SlotQueueLast]uint64 + requestStates [RequestStateMax]uint64 + containerStates [ContainerStateMax]uint64 } type slotToken struct { @@ -178,13 +168,20 @@ func (a *slotQueue) queueSlot(slot Slot) *slotToken { // isIdle() returns true is there's no activity for this slot queue. This // means no one is waiting, running or starting. func (a *slotQueue) isIdle() bool { - var partySize uint64 + var isIdle bool a.statsLock.Lock() - partySize = a.stats.states[SlotQueueWaiter] + a.stats.states[SlotQueueStarter] + a.stats.states[SlotQueueRunner] + + isIdle = a.stats.requestStates[RequestStateWait] == 0 && + a.stats.requestStates[RequestStateExec] == 0 && + a.stats.containerStates[ContainerStateWait] == 0 && + a.stats.containerStates[ContainerStateStart] == 0 && + a.stats.containerStates[ContainerStateIdle] == 0 && + a.stats.containerStates[ContainerStateBusy] == 0 + a.statsLock.Unlock() - return partySize == 0 + return isIdle } func (a *slotQueue) getStats() slotQueueStats { @@ -197,66 +194,65 @@ func (a *slotQueue) getStats() slotQueueStats { func isNewContainerNeeded(cur *slotQueueStats) bool { - idlers := cur.states[SlotQueueIdle] - starters := cur.states[SlotQueueStarter] - waiters := cur.states[SlotQueueWaiter] + idleWorkers := cur.containerStates[ContainerStateIdle] + starters := cur.containerStates[ContainerStateStart] + startWaiters := cur.containerStates[ContainerStateWait] + + queuedRequests := cur.requestStates[RequestStateWait] // we expect idle containers to immediately pick up // any waiters. We assume non-idle containers busy. effectiveWaiters := uint64(0) - if idlers < waiters { - effectiveWaiters = waiters - idlers + if idleWorkers < queuedRequests { + effectiveWaiters = queuedRequests - idleWorkers } if effectiveWaiters == 0 { return false } + // we expect resource waiters to eventually transition + // into starters. + effectiveStarters := starters + startWaiters + // if containers are starting, do not start more than effective waiters - if starters > 0 && starters >= effectiveWaiters { + if effectiveStarters > 0 && effectiveStarters >= effectiveWaiters { return false } return true } -func (a *slotQueue) enterState(metricIdx SlotQueueMetricType) { - a.statsLock.Lock() - a.stats.states[metricIdx] += 1 - a.statsLock.Unlock() -} - -func (a *slotQueue) exitState(metricIdx SlotQueueMetricType) { - a.statsLock.Lock() - if a.stats.states[metricIdx] == 0 { - panic(fmt.Sprintf("BUG: metric tracking fault idx=%v", metricIdx)) +func (a *slotQueue) enterRequestState(reqType RequestStateType) { + if reqType > RequestStateNone && reqType < RequestStateMax { + a.statsLock.Lock() + a.stats.requestStates[reqType] += 1 + a.statsLock.Unlock() } - a.stats.states[metricIdx] -= 1 - a.statsLock.Unlock() } -func (a *slotQueue) recordLatencyLocked(metricIdx SlotQueueMetricType, latency uint64) { - // exponentially weighted moving average with smoothing factor of 0.5 - // 0.5 is a high value to age older observations fast while filtering - // some noise. For our purposes, newer observations are much more important - // than older, but we still would like to low pass some noise. - a.stats.latencies[metricIdx] = (a.stats.latencies[metricIdx]*5 + latency*5) / 10 -} - -func (a *slotQueue) recordLatency(metricIdx SlotQueueMetricType, latency uint64) { - a.statsLock.Lock() - a.recordLatencyLocked(metricIdx, latency) - a.statsLock.Unlock() -} - -func (a *slotQueue) exitStateWithLatency(metricIdx SlotQueueMetricType, latency uint64) { - a.statsLock.Lock() - if a.stats.states[metricIdx] == 0 { - panic(fmt.Sprintf("BUG: metric tracking fault idx=%v", metricIdx)) +func (a *slotQueue) exitRequestState(reqType RequestStateType) { + if reqType > RequestStateNone && reqType < RequestStateMax { + a.statsLock.Lock() + a.stats.requestStates[reqType] -= 1 + a.statsLock.Unlock() + } +} + +func (a *slotQueue) enterContainerState(conType ContainerStateType) { + if conType > ContainerStateNone && conType < ContainerStateMax { + a.statsLock.Lock() + a.stats.containerStates[conType] += 1 + a.statsLock.Unlock() + } +} + +func (a *slotQueue) exitContainerState(conType ContainerStateType) { + if conType > ContainerStateNone && conType < ContainerStateMax { + a.statsLock.Lock() + a.stats.containerStates[conType] -= 1 + a.statsLock.Unlock() } - a.stats.states[metricIdx] -= 1 - a.recordLatencyLocked(metricIdx, latency) - a.statsLock.Unlock() } // getSlot must ensure that if it receives a slot, it will be returned, otherwise @@ -271,7 +267,6 @@ func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) { slots = NewSlotQueue(key) a.hot[key] = slots } - slots.enterState(SlotQueueWaiter) a.hMu.Unlock() return slots, !ok diff --git a/api/agent/slots_test.go b/api/agent/slots_test.go index 6b331f31d..694ae4304 100644 --- a/api/agent/slots_test.go +++ b/api/agent/slots_test.go @@ -174,9 +174,10 @@ func TestSlotQueueBasic2(t *testing.T) { } } -func statsHelperSet(runC, startC, waitC, idleC uint64) slotQueueStats { +func statsHelperSet(reqW, reqE, conW, conS, conI, conB uint64) slotQueueStats { return slotQueueStats{ - states: [SlotQueueLast]uint64{runC, startC, waitC, idleC}, + requestStates: [RequestStateMax]uint64{0, reqW, reqE, 0}, + containerStates: [ContainerStateMax]uint64{0, conW, conS, conI, conB, 0}, } } @@ -184,38 +185,38 @@ func TestSlotNewContainerLogic1(t *testing.T) { var cur slotQueueStats - cur = statsHelperSet(0, 0, 0, 0) - // CASE: There's no one waiting + cur = statsHelperSet(0, 0, 0, 0, 0, 0) + // CASE: There's no queued requests if isNewContainerNeeded(&cur) { t.Fatalf("Should not need a new container cur: %#v", cur) } - // CASE: There are starters >= waiters - cur = statsHelperSet(1, 10, 10, 0) + // CASE: There are starters >= queued requests + cur = statsHelperSet(1, 0, 0, 10, 0, 0) if isNewContainerNeeded(&cur) { t.Fatalf("Should not need a new container cur: %#v", cur) } - // CASE: There are starters < waiters - cur = statsHelperSet(1, 5, 10, 0) + // CASE: There are starters < queued requests + cur = statsHelperSet(10, 0, 0, 1, 0, 0) if !isNewContainerNeeded(&cur) { t.Fatalf("Should need a new container cur: %#v", cur) } - // CASE: effective waiters 0 (idle = waiter = 10) - cur = statsHelperSet(11, 0, 10, 10) + // CASE: effective queued requests (idle > requests) + cur = statsHelperSet(10, 0, 0, 0, 11, 0) if isNewContainerNeeded(&cur) { t.Fatalf("Should not need a new container cur: %#v", cur) } - // CASE: effective waiters > 0 (idle = 5 waiter = 10) - cur = statsHelperSet(11, 0, 10, 5) + // CASE: effective queued requests (idle < requests) + cur = statsHelperSet(10, 0, 0, 0, 5, 0) if !isNewContainerNeeded(&cur) { t.Fatalf("Should need a new container cur: %#v", cur) } - // CASE: no executors, but 1 waiter - cur = statsHelperSet(0, 0, 1, 0) + // CASE: no executors, but 1 queued request + cur = statsHelperSet(1, 0, 0, 0, 0, 0) if !isNewContainerNeeded(&cur) { t.Fatalf("Should need a new container cur: %#v", cur) } diff --git a/api/agent/state_trackers.go b/api/agent/state_trackers.go new file mode 100644 index 000000000..a22f29e9e --- /dev/null +++ b/api/agent/state_trackers.go @@ -0,0 +1,152 @@ +package agent + +import ( + "context" + "sync" + "time" + + "github.com/fnproject/fn/api/common" +) + +type RequestStateType int +type ContainerStateType int + +type containerState struct { + lock sync.Mutex + state ContainerStateType + start time.Time +} + +type requestState struct { + lock sync.Mutex + state RequestStateType + start time.Time +} + +type ContainerState interface { + UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue) +} +type RequestState interface { + UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue) +} + +func NewRequestState() RequestState { + return &requestState{} +} + +func NewContainerState() ContainerState { + return &containerState{} +} + +const ( + RequestStateNone RequestStateType = iota // uninitialized + RequestStateWait // request is waiting + RequestStateExec // request is executing + RequestStateDone // request is done + RequestStateMax +) + +const ( + ContainerStateNone ContainerStateType = iota // uninitialized + ContainerStateWait // resource (cpu + mem) waiting + ContainerStateStart // launching + ContainerStateIdle // running idle + ContainerStateBusy // running busy + ContainerStateDone // exited/failed/done + ContainerStateMax +) + +var containerGaugeKeys = [ContainerStateMax]string{ + "", + "container_wait_total", + "container_start_total", + "container_idle_total", + "container_busy_total", + "container_done_total", +} +var containerTimeKeys = [ContainerStateMax]string{ + "", + "container_wait_duration_seconds", + "container_start_duration_seconds", + "container_idle_duration_seconds", + "container_busy_duration_seconds", +} + +func (c *requestState) UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue) { + + var now time.Time + var oldState RequestStateType + + c.lock.Lock() + + // we can only advance our state forward + if c.state < newState { + + now = time.Now() + oldState = c.state + c.state = newState + c.start = now + } + + c.lock.Unlock() + + if now.IsZero() { + return + } + + // reflect this change to slot mgr if defined (AKA hot) + if slots != nil { + slots.enterRequestState(newState) + slots.exitRequestState(oldState) + } +} + +func (c *containerState) UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue) { + + var now time.Time + var oldState ContainerStateType + var before time.Time + + c.lock.Lock() + + // except for 1) switching back to idle from busy (hot containers) or 2) + // to waiting from done, otherwise we can only move forward in states + if c.state < newState || + (c.state == ContainerStateBusy && newState == ContainerStateIdle) || + (c.state == ContainerStateDone && newState == ContainerStateIdle) { + + now = time.Now() + oldState = c.state + before = c.start + c.state = newState + c.start = now + } + + c.lock.Unlock() + + if now.IsZero() { + return + } + + // reflect this change to slot mgr if defined (AKA hot) + if slots != nil { + slots.enterContainerState(newState) + slots.exitContainerState(oldState) + } + + // update old state stats + gaugeKey := containerGaugeKeys[oldState] + if gaugeKey != "" { + common.DecrementGauge(ctx, gaugeKey) + } + timeKey := containerTimeKeys[oldState] + if timeKey != "" { + common.PublishElapsedTimeHistogram(ctx, timeKey, before, now) + } + + // update new state stats + gaugeKey = containerGaugeKeys[newState] + if gaugeKey != "" { + common.IncrementGauge(ctx, gaugeKey) + } +} diff --git a/api/agent/stats.go b/api/agent/stats.go index 3a7e2155c..eacd1af44 100644 --- a/api/agent/stats.go +++ b/api/agent/stats.go @@ -65,91 +65,98 @@ func (s *stats) getStatsForFunction(path string) *functionStats { func (s *stats) Enqueue(ctx context.Context, app string, path string) { s.mu.Lock() + fstats := s.getStatsForFunction(path) s.queue++ - s.getStatsForFunction(path).queue++ - common.IncrementGauge(ctx, queuedMetricName) - - common.IncrementCounter(ctx, callsMetricName) + fstats.queue++ s.mu.Unlock() + + common.IncrementGauge(ctx, queuedMetricName) + common.IncrementCounter(ctx, callsMetricName) } // Call when a function has been queued but cannot be started because of an error func (s *stats) Dequeue(ctx context.Context, app string, path string) { s.mu.Lock() + fstats := s.getStatsForFunction(path) s.queue-- - s.getStatsForFunction(path).queue-- - common.DecrementGauge(ctx, queuedMetricName) + fstats.queue-- s.mu.Unlock() + + common.DecrementGauge(ctx, queuedMetricName) } func (s *stats) DequeueAndStart(ctx context.Context, app string, path string) { s.mu.Lock() + fstats := s.getStatsForFunction(path) s.queue-- - s.getStatsForFunction(path).queue-- - common.DecrementGauge(ctx, queuedMetricName) - s.running++ - s.getStatsForFunction(path).running++ - common.IncrementGauge(ctx, runningMetricName) + fstats.queue-- + fstats.running++ s.mu.Unlock() + + common.DecrementGauge(ctx, queuedMetricName) + common.IncrementGauge(ctx, runningMetricName) } func (s *stats) Complete(ctx context.Context, app string, path string) { s.mu.Lock() + fstats := s.getStatsForFunction(path) s.running-- - s.getStatsForFunction(path).running-- - common.DecrementGauge(ctx, runningMetricName) - s.complete++ - s.getStatsForFunction(path).complete++ - common.IncrementCounter(ctx, completedMetricName) + fstats.running-- + fstats.complete++ s.mu.Unlock() + + common.DecrementGauge(ctx, runningMetricName) + common.IncrementCounter(ctx, completedMetricName) } func (s *stats) Failed(ctx context.Context, app string, path string) { s.mu.Lock() + fstats := s.getStatsForFunction(path) s.running-- - s.getStatsForFunction(path).running-- - common.DecrementGauge(ctx, runningMetricName) - s.failed++ - s.getStatsForFunction(path).failed++ - common.IncrementCounter(ctx, failedMetricName) + fstats.running-- + fstats.failed++ s.mu.Unlock() + + common.DecrementGauge(ctx, runningMetricName) + common.IncrementCounter(ctx, failedMetricName) } func (s *stats) DequeueAndFail(ctx context.Context, app string, path string) { s.mu.Lock() + fstats := s.getStatsForFunction(path) s.queue-- - s.getStatsForFunction(path).queue-- - common.DecrementGauge(ctx, queuedMetricName) - s.failed++ - s.getStatsForFunction(path).failed++ - common.IncrementCounter(ctx, failedMetricName) + fstats.queue-- + fstats.failed++ s.mu.Unlock() + + common.DecrementGauge(ctx, queuedMetricName) + common.IncrementCounter(ctx, failedMetricName) } -func (s *stats) IncrementTimedout(ctx context.Context) { +func IncrementTimedout(ctx context.Context) { common.IncrementCounter(ctx, timedoutMetricName) } -func (s *stats) IncrementErrors(ctx context.Context) { +func IncrementErrors(ctx context.Context) { common.IncrementCounter(ctx, errorsMetricName) } -func (s *stats) IncrementTooBusy(ctx context.Context) { +func IncrementTooBusy(ctx context.Context) { common.IncrementCounter(ctx, serverBusyMetricName) } diff --git a/api/common/metrics.go b/api/common/metrics.go index 712675ccc..42f1e997e 100644 --- a/api/common/metrics.go +++ b/api/common/metrics.go @@ -2,6 +2,8 @@ package common import ( "context" + "time" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" ) @@ -106,6 +108,14 @@ func PublishHistogramToSpan(span opentracing.Span, key string, value float64) { span.LogFields(log.Float64(fieldname, value)) } +// PublishElapsedTimeToSpan publishes the specifed histogram elapsed time since start +// It does this by logging an appropriate field value to a tracing span +// Use this when the current tracing span is long-lived and you want the metric to be visible before it ends +func PublishElapsedTimeHistogram(ctx context.Context, key string, start, end time.Time) { + elapsed := float64(end.Sub(start).Seconds()) + PublishHistogram(ctx, key, elapsed) +} + const ( // FnPrefix is a constant for "fn_", used as a prefix for span names, field names, Prometheus metric names and Prometheus label names