fn: better slot/container/request state tracking (#719)

* fn: better slot/container/request state tracking
This commit is contained in:
Tolga Ceylan
2018-01-26 12:21:11 -08:00
committed by GitHub
parent a7223437df
commit 97d78c584b
7 changed files with 331 additions and 136 deletions

View File

@@ -180,9 +180,32 @@ func (a *agent) Submit(callI Call) error {
return err return err
} }
func (a *agent) startStateTrackers(ctx context.Context, call *call) {
if !protocol.IsStreamable(protocol.Protocol(call.Format)) {
// For cold containers, we track the container state in call
call.containerState = NewContainerState()
}
call.requestState = NewRequestState()
}
func (a *agent) endStateTrackers(ctx context.Context, call *call) {
call.requestState.UpdateState(ctx, RequestStateDone, call.slots)
// For cold containers, we are done with the container.
if call.containerState != nil {
call.containerState.UpdateState(ctx, ContainerStateDone, call.slots)
}
}
func (a *agent) submit(ctx context.Context, call *call) error { func (a *agent) submit(ctx context.Context, call *call) error {
a.stats.Enqueue(ctx, call.AppName, call.Path) a.stats.Enqueue(ctx, call.AppName, call.Path)
a.startStateTrackers(ctx, call)
defer a.endStateTrackers(ctx, call)
slot, err := a.getSlot(ctx, call) slot, err := a.getSlot(ctx, call)
if err != nil { if err != nil {
a.handleStatsDequeue(ctx, call, err) a.handleStatsDequeue(ctx, call, err)
@@ -226,10 +249,10 @@ func transformTimeout(e error, isRetriable bool) error {
func (a *agent) handleStatsDequeue(ctx context.Context, call *call, err error) { func (a *agent) handleStatsDequeue(ctx context.Context, call *call, err error) {
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
a.stats.Dequeue(ctx, call.AppName, call.Path) a.stats.Dequeue(ctx, call.AppName, call.Path)
a.stats.IncrementTooBusy(ctx) IncrementTooBusy(ctx)
} else { } else {
a.stats.DequeueAndFail(ctx, call.AppName, call.Path) a.stats.DequeueAndFail(ctx, call.AppName, call.Path)
a.stats.IncrementErrors(ctx) IncrementErrors(ctx)
} }
} }
@@ -243,9 +266,9 @@ func (a *agent) handleStatsEnd(ctx context.Context, call *call, err error) {
a.stats.Failed(ctx, call.AppName, call.Path) a.stats.Failed(ctx, call.AppName, call.Path)
// increment the timeout or errors count, as appropriate // increment the timeout or errors count, as appropriate
if err == context.DeadlineExceeded { if err == context.DeadlineExceeded {
a.stats.IncrementTimedout(ctx) IncrementTimedout(ctx)
} else { } else {
a.stats.IncrementErrors(ctx) IncrementErrors(ctx)
} }
} }
} }
@@ -282,22 +305,19 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_get_slot") span, ctx := opentracing.StartSpanFromContext(ctx, "agent_get_slot")
defer span.Finish() defer span.Finish()
isHot := protocol.IsStreamable(protocol.Protocol(call.Format)) if protocol.IsStreamable(protocol.Protocol(call.Format)) {
if isHot {
start := time.Now()
// For hot requests, we use a long lived slot queue, which we use to manage hot containers // For hot requests, we use a long lived slot queue, which we use to manage hot containers
var isNew bool var isNew bool
call.slots, isNew = a.slotMgr.getSlotQueue(call) call.slots, isNew = a.slotMgr.getSlotQueue(call)
call.requestState.UpdateState(ctx, RequestStateWait, call.slots)
if isNew { if isNew {
go a.hotLauncher(ctx, call) go a.hotLauncher(ctx, call)
} }
s, err := a.waitHot(ctx, call) s, err := a.waitHot(ctx, call)
call.slots.exitStateWithLatency(SlotQueueWaiter, uint64(time.Now().Sub(start).Seconds()*1000))
return s, err return s, err
} }
call.requestState.UpdateState(ctx, RequestStateWait, call.slots)
return a.launchCold(ctx, call) return a.launchCold(ctx, call)
} }
@@ -351,18 +371,24 @@ func (a *agent) checkLaunch(ctx context.Context, call *call) {
if !isNeeded { if !isNeeded {
return return
} }
common.Logger(ctx).WithFields(logrus.Fields{"currentStats": curStats, "isNeeded": isNeeded}).Info("Hot function launcher starting hot container")
state := NewContainerState()
state.UpdateState(ctx, ContainerStateWait, call.slots)
common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container")
select { select {
case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync):
a.wg.Add(1) // add waiter in this thread a.wg.Add(1) // add waiter in this thread
go func() { go func() {
// NOTE: runHot will not inherit the timeout from ctx (ignore timings) // NOTE: runHot will not inherit the timeout from ctx (ignore timings)
a.runHot(ctx, call, tok) a.runHot(ctx, call, tok, state)
a.wg.Done() a.wg.Done()
}() }()
case <-ctx.Done(): // timeout case <-ctx.Done(): // timeout
state.UpdateState(ctx, ContainerStateDone, call.slots)
case <-a.shutdown: // server shutdown case <-a.shutdown: // server shutdown
state.UpdateState(ctx, ContainerStateDone, call.slots)
} }
} }
@@ -418,6 +444,8 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_launch_cold") span, ctx := opentracing.StartSpanFromContext(ctx, "agent_launch_cold")
defer span.Finish() defer span.Finish()
call.containerState.UpdateState(ctx, ContainerStateWait, call.slots)
select { select {
case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync):
go a.prepCold(ctx, call, tok, ch) go a.prepCold(ctx, call, tok, ch)
@@ -453,6 +481,9 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_cold_exec") span, ctx := opentracing.StartSpanFromContext(ctx, "agent_cold_exec")
defer span.Finish() defer span.Finish()
call.requestState.UpdateState(ctx, RequestStateExec, call.slots)
call.containerState.UpdateState(ctx, ContainerStateBusy, call.slots)
waiter, err := s.cookie.Run(ctx) waiter, err := s.cookie.Run(ctx)
if err != nil { if err != nil {
return err return err
@@ -504,12 +535,11 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_hot_exec") span, ctx := opentracing.StartSpanFromContext(ctx, "agent_hot_exec")
defer span.Finish() defer span.Finish()
call.requestState.UpdateState(ctx, RequestStateExec, call.slots)
// link the container id and id in the logs [for us!] // link the container id and id in the logs [for us!]
common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call") common.Logger(ctx).WithField("container_id", s.container.id).Info("starting call")
start := time.Now()
defer func() { call.slots.recordLatency(SlotQueueRunner, uint64(time.Now().Sub(start).Seconds()*1000)) }()
// swap in the new stderr logger & stat accumulator // swap in the new stderr logger & stat accumulator
oldStderr := s.container.swap(call.stderr, &call.Stats) oldStderr := s.container.swap(call.stderr, &call.Stats)
defer s.container.swap(oldStderr, nil) // once we're done, swap out in this scope to prevent races defer s.container.swap(oldStderr, nil) // once we're done, swap out in this scope to prevent races
@@ -542,6 +572,8 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_prep_cold") span, ctx := opentracing.StartSpanFromContext(ctx, "agent_prep_cold")
defer span.Finish() defer span.Finish()
call.containerState.UpdateState(ctx, ContainerStateStart, call.slots)
// add additional headers to the config to shove everything into env vars for cold // add additional headers to the config to shove everything into env vars for cold
for k, v := range call.Headers { for k, v := range call.Headers {
if !specialHeader(k) { if !specialHeader(k) {
@@ -568,6 +600,9 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
// pull & create container before we return a slot, so as to be friendly // pull & create container before we return a slot, so as to be friendly
// about timing out if this takes a while... // about timing out if this takes a while...
cookie, err := a.driver.Prepare(ctx, container) cookie, err := a.driver.Prepare(ctx, container)
call.containerState.UpdateState(ctx, ContainerStateIdle, call.slots)
slot := &coldSlot{cookie, tok, err} slot := &coldSlot{cookie, tok, err}
select { select {
case ch <- slot: case ch <- slot:
@@ -576,7 +611,7 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
} }
} }
func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) { func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state ContainerState) {
// IMPORTANT: get a context that has a child span / logger but NO timeout // IMPORTANT: get a context that has a child span / logger but NO timeout
// TODO this is a 'FollowsFrom' // TODO this is a 'FollowsFrom'
ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx)) ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx))
@@ -590,8 +625,8 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) {
proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead) proto := protocol.New(protocol.Protocol(call.Format), stdinWrite, stdoutRead)
start := time.Now() state.UpdateState(ctx, ContainerStateStart, call.slots)
call.slots.enterState(SlotQueueStarter) defer state.UpdateState(ctx, ContainerStateDone, call.slots)
cid := id.New().String() cid := id.New().String()
@@ -617,7 +652,6 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) {
cookie, err := a.driver.Prepare(ctx, container) cookie, err := a.driver.Prepare(ctx, container)
if err != nil { if err != nil {
call.slots.exitStateWithLatency(SlotQueueStarter, uint64(time.Now().Sub(start).Seconds()*1000))
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err}) call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err})
return return
} }
@@ -625,15 +659,12 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) {
waiter, err := cookie.Run(ctx) waiter, err := cookie.Run(ctx)
if err != nil { if err != nil {
call.slots.exitStateWithLatency(SlotQueueStarter, uint64(time.Now().Sub(start).Seconds()*1000))
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err}) call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err})
return return
} }
// container is running // container is running
call.slots.enterState(SlotQueueRunner) state.UpdateState(ctx, ContainerStateIdle, call.slots)
call.slots.exitStateWithLatency(SlotQueueStarter, uint64(time.Now().Sub(start).Seconds()*1000))
defer call.slots.exitState(SlotQueueRunner)
// buffered, in case someone has slot when waiter returns but isn't yet listening // buffered, in case someone has slot when waiter returns but isn't yet listening
errC := make(chan error, 1) errC := make(chan error, 1)
@@ -653,30 +684,27 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken) {
} }
done := make(chan struct{}) done := make(chan struct{})
start := time.Now() state.UpdateState(ctx, ContainerStateIdle, call.slots)
call.slots.enterState(SlotQueueIdle)
s := call.slots.queueSlot(&hotSlot{done, proto, errC, container, nil}) s := call.slots.queueSlot(&hotSlot{done, proto, errC, container, nil})
select { select {
case <-s.trigger: case <-s.trigger:
call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000))
case <-time.After(time.Duration(call.IdleTimeout) * time.Second): case <-time.After(time.Duration(call.IdleTimeout) * time.Second):
if call.slots.ejectSlot(s) { if call.slots.ejectSlot(s) {
call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000))
logger.Info("Canceling inactive hot function") logger.Info("Canceling inactive hot function")
return return
} }
case <-ctx.Done(): // container shutdown case <-ctx.Done(): // container shutdown
if call.slots.ejectSlot(s) { if call.slots.ejectSlot(s) {
call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000))
return return
} }
case <-a.shutdown: // server shutdown case <-a.shutdown: // server shutdown
if call.slots.ejectSlot(s) { if call.slots.ejectSlot(s) {
call.slots.exitStateWithLatency(SlotQueueIdle, uint64(time.Now().Sub(start).Seconds()*1000))
return return
} }
} }
state.UpdateState(ctx, ContainerStateBusy, call.slots)
// IMPORTANT: if we fail to eject the slot, it means that a consumer // IMPORTANT: if we fail to eject the slot, it means that a consumer
// just dequeued this and acquired the slot. In other words, we were // just dequeued this and acquired the slot. In other words, we were
// late in ejectSlots(), so we have to execute this request in this // late in ejectSlots(), so we have to execute this request in this

View File

@@ -257,6 +257,8 @@ type call struct {
slots *slotQueue slots *slotQueue
slotDeadline time.Time slotDeadline time.Time
execDeadline time.Time execDeadline time.Time
requestState RequestState
containerState ContainerState
} }
func (c *call) Model() *models.Call { return c.Call } func (c *call) Model() *models.Call { return c.Call }

View File

@@ -27,20 +27,10 @@ type slotQueueMgr struct {
hot map[string]*slotQueue hot map[string]*slotQueue
} }
type SlotQueueMetricType int // request and container states
const (
SlotQueueRunner SlotQueueMetricType = iota // container is running
SlotQueueStarter // container is launching
SlotQueueWaiter // requests are waiting
SlotQueueIdle // hot container is running, but idle (free tokens)
SlotQueueLast
)
// counters per state and moving avg of time spent in each state
type slotQueueStats struct { type slotQueueStats struct {
states [SlotQueueLast]uint64 requestStates [RequestStateMax]uint64
latencies [SlotQueueLast]uint64 containerStates [ContainerStateMax]uint64
} }
type slotToken struct { type slotToken struct {
@@ -178,13 +168,20 @@ func (a *slotQueue) queueSlot(slot Slot) *slotToken {
// isIdle() returns true is there's no activity for this slot queue. This // isIdle() returns true is there's no activity for this slot queue. This
// means no one is waiting, running or starting. // means no one is waiting, running or starting.
func (a *slotQueue) isIdle() bool { func (a *slotQueue) isIdle() bool {
var partySize uint64 var isIdle bool
a.statsLock.Lock() a.statsLock.Lock()
partySize = a.stats.states[SlotQueueWaiter] + a.stats.states[SlotQueueStarter] + a.stats.states[SlotQueueRunner]
isIdle = a.stats.requestStates[RequestStateWait] == 0 &&
a.stats.requestStates[RequestStateExec] == 0 &&
a.stats.containerStates[ContainerStateWait] == 0 &&
a.stats.containerStates[ContainerStateStart] == 0 &&
a.stats.containerStates[ContainerStateIdle] == 0 &&
a.stats.containerStates[ContainerStateBusy] == 0
a.statsLock.Unlock() a.statsLock.Unlock()
return partySize == 0 return isIdle
} }
func (a *slotQueue) getStats() slotQueueStats { func (a *slotQueue) getStats() slotQueueStats {
@@ -197,66 +194,65 @@ func (a *slotQueue) getStats() slotQueueStats {
func isNewContainerNeeded(cur *slotQueueStats) bool { func isNewContainerNeeded(cur *slotQueueStats) bool {
idlers := cur.states[SlotQueueIdle] idleWorkers := cur.containerStates[ContainerStateIdle]
starters := cur.states[SlotQueueStarter] starters := cur.containerStates[ContainerStateStart]
waiters := cur.states[SlotQueueWaiter] startWaiters := cur.containerStates[ContainerStateWait]
queuedRequests := cur.requestStates[RequestStateWait]
// we expect idle containers to immediately pick up // we expect idle containers to immediately pick up
// any waiters. We assume non-idle containers busy. // any waiters. We assume non-idle containers busy.
effectiveWaiters := uint64(0) effectiveWaiters := uint64(0)
if idlers < waiters { if idleWorkers < queuedRequests {
effectiveWaiters = waiters - idlers effectiveWaiters = queuedRequests - idleWorkers
} }
if effectiveWaiters == 0 { if effectiveWaiters == 0 {
return false return false
} }
// we expect resource waiters to eventually transition
// into starters.
effectiveStarters := starters + startWaiters
// if containers are starting, do not start more than effective waiters // if containers are starting, do not start more than effective waiters
if starters > 0 && starters >= effectiveWaiters { if effectiveStarters > 0 && effectiveStarters >= effectiveWaiters {
return false return false
} }
return true return true
} }
func (a *slotQueue) enterState(metricIdx SlotQueueMetricType) { func (a *slotQueue) enterRequestState(reqType RequestStateType) {
if reqType > RequestStateNone && reqType < RequestStateMax {
a.statsLock.Lock() a.statsLock.Lock()
a.stats.states[metricIdx] += 1 a.stats.requestStates[reqType] += 1
a.statsLock.Unlock() a.statsLock.Unlock()
}
func (a *slotQueue) exitState(metricIdx SlotQueueMetricType) {
a.statsLock.Lock()
if a.stats.states[metricIdx] == 0 {
panic(fmt.Sprintf("BUG: metric tracking fault idx=%v", metricIdx))
} }
a.stats.states[metricIdx] -= 1
a.statsLock.Unlock()
} }
func (a *slotQueue) recordLatencyLocked(metricIdx SlotQueueMetricType, latency uint64) { func (a *slotQueue) exitRequestState(reqType RequestStateType) {
// exponentially weighted moving average with smoothing factor of 0.5 if reqType > RequestStateNone && reqType < RequestStateMax {
// 0.5 is a high value to age older observations fast while filtering
// some noise. For our purposes, newer observations are much more important
// than older, but we still would like to low pass some noise.
a.stats.latencies[metricIdx] = (a.stats.latencies[metricIdx]*5 + latency*5) / 10
}
func (a *slotQueue) recordLatency(metricIdx SlotQueueMetricType, latency uint64) {
a.statsLock.Lock() a.statsLock.Lock()
a.recordLatencyLocked(metricIdx, latency) a.stats.requestStates[reqType] -= 1
a.statsLock.Unlock() a.statsLock.Unlock()
}
func (a *slotQueue) exitStateWithLatency(metricIdx SlotQueueMetricType, latency uint64) {
a.statsLock.Lock()
if a.stats.states[metricIdx] == 0 {
panic(fmt.Sprintf("BUG: metric tracking fault idx=%v", metricIdx))
} }
a.stats.states[metricIdx] -= 1 }
a.recordLatencyLocked(metricIdx, latency)
func (a *slotQueue) enterContainerState(conType ContainerStateType) {
if conType > ContainerStateNone && conType < ContainerStateMax {
a.statsLock.Lock()
a.stats.containerStates[conType] += 1
a.statsLock.Unlock() a.statsLock.Unlock()
}
}
func (a *slotQueue) exitContainerState(conType ContainerStateType) {
if conType > ContainerStateNone && conType < ContainerStateMax {
a.statsLock.Lock()
a.stats.containerStates[conType] -= 1
a.statsLock.Unlock()
}
} }
// getSlot must ensure that if it receives a slot, it will be returned, otherwise // getSlot must ensure that if it receives a slot, it will be returned, otherwise
@@ -271,7 +267,6 @@ func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) {
slots = NewSlotQueue(key) slots = NewSlotQueue(key)
a.hot[key] = slots a.hot[key] = slots
} }
slots.enterState(SlotQueueWaiter)
a.hMu.Unlock() a.hMu.Unlock()
return slots, !ok return slots, !ok

View File

@@ -174,9 +174,10 @@ func TestSlotQueueBasic2(t *testing.T) {
} }
} }
func statsHelperSet(runC, startC, waitC, idleC uint64) slotQueueStats { func statsHelperSet(reqW, reqE, conW, conS, conI, conB uint64) slotQueueStats {
return slotQueueStats{ return slotQueueStats{
states: [SlotQueueLast]uint64{runC, startC, waitC, idleC}, requestStates: [RequestStateMax]uint64{0, reqW, reqE, 0},
containerStates: [ContainerStateMax]uint64{0, conW, conS, conI, conB, 0},
} }
} }
@@ -184,38 +185,38 @@ func TestSlotNewContainerLogic1(t *testing.T) {
var cur slotQueueStats var cur slotQueueStats
cur = statsHelperSet(0, 0, 0, 0) cur = statsHelperSet(0, 0, 0, 0, 0, 0)
// CASE: There's no one waiting // CASE: There's no queued requests
if isNewContainerNeeded(&cur) { if isNewContainerNeeded(&cur) {
t.Fatalf("Should not need a new container cur: %#v", cur) t.Fatalf("Should not need a new container cur: %#v", cur)
} }
// CASE: There are starters >= waiters // CASE: There are starters >= queued requests
cur = statsHelperSet(1, 10, 10, 0) cur = statsHelperSet(1, 0, 0, 10, 0, 0)
if isNewContainerNeeded(&cur) { if isNewContainerNeeded(&cur) {
t.Fatalf("Should not need a new container cur: %#v", cur) t.Fatalf("Should not need a new container cur: %#v", cur)
} }
// CASE: There are starters < waiters // CASE: There are starters < queued requests
cur = statsHelperSet(1, 5, 10, 0) cur = statsHelperSet(10, 0, 0, 1, 0, 0)
if !isNewContainerNeeded(&cur) { if !isNewContainerNeeded(&cur) {
t.Fatalf("Should need a new container cur: %#v", cur) t.Fatalf("Should need a new container cur: %#v", cur)
} }
// CASE: effective waiters 0 (idle = waiter = 10) // CASE: effective queued requests (idle > requests)
cur = statsHelperSet(11, 0, 10, 10) cur = statsHelperSet(10, 0, 0, 0, 11, 0)
if isNewContainerNeeded(&cur) { if isNewContainerNeeded(&cur) {
t.Fatalf("Should not need a new container cur: %#v", cur) t.Fatalf("Should not need a new container cur: %#v", cur)
} }
// CASE: effective waiters > 0 (idle = 5 waiter = 10) // CASE: effective queued requests (idle < requests)
cur = statsHelperSet(11, 0, 10, 5) cur = statsHelperSet(10, 0, 0, 0, 5, 0)
if !isNewContainerNeeded(&cur) { if !isNewContainerNeeded(&cur) {
t.Fatalf("Should need a new container cur: %#v", cur) t.Fatalf("Should need a new container cur: %#v", cur)
} }
// CASE: no executors, but 1 waiter // CASE: no executors, but 1 queued request
cur = statsHelperSet(0, 0, 1, 0) cur = statsHelperSet(1, 0, 0, 0, 0, 0)
if !isNewContainerNeeded(&cur) { if !isNewContainerNeeded(&cur) {
t.Fatalf("Should need a new container cur: %#v", cur) t.Fatalf("Should need a new container cur: %#v", cur)
} }

152
api/agent/state_trackers.go Normal file
View File

@@ -0,0 +1,152 @@
package agent
import (
"context"
"sync"
"time"
"github.com/fnproject/fn/api/common"
)
type RequestStateType int
type ContainerStateType int
type containerState struct {
lock sync.Mutex
state ContainerStateType
start time.Time
}
type requestState struct {
lock sync.Mutex
state RequestStateType
start time.Time
}
type ContainerState interface {
UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue)
}
type RequestState interface {
UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}
func NewRequestState() RequestState {
return &requestState{}
}
func NewContainerState() ContainerState {
return &containerState{}
}
const (
RequestStateNone RequestStateType = iota // uninitialized
RequestStateWait // request is waiting
RequestStateExec // request is executing
RequestStateDone // request is done
RequestStateMax
)
const (
ContainerStateNone ContainerStateType = iota // uninitialized
ContainerStateWait // resource (cpu + mem) waiting
ContainerStateStart // launching
ContainerStateIdle // running idle
ContainerStateBusy // running busy
ContainerStateDone // exited/failed/done
ContainerStateMax
)
var containerGaugeKeys = [ContainerStateMax]string{
"",
"container_wait_total",
"container_start_total",
"container_idle_total",
"container_busy_total",
"container_done_total",
}
var containerTimeKeys = [ContainerStateMax]string{
"",
"container_wait_duration_seconds",
"container_start_duration_seconds",
"container_idle_duration_seconds",
"container_busy_duration_seconds",
}
func (c *requestState) UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue) {
var now time.Time
var oldState RequestStateType
c.lock.Lock()
// we can only advance our state forward
if c.state < newState {
now = time.Now()
oldState = c.state
c.state = newState
c.start = now
}
c.lock.Unlock()
if now.IsZero() {
return
}
// reflect this change to slot mgr if defined (AKA hot)
if slots != nil {
slots.enterRequestState(newState)
slots.exitRequestState(oldState)
}
}
func (c *containerState) UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue) {
var now time.Time
var oldState ContainerStateType
var before time.Time
c.lock.Lock()
// except for 1) switching back to idle from busy (hot containers) or 2)
// to waiting from done, otherwise we can only move forward in states
if c.state < newState ||
(c.state == ContainerStateBusy && newState == ContainerStateIdle) ||
(c.state == ContainerStateDone && newState == ContainerStateIdle) {
now = time.Now()
oldState = c.state
before = c.start
c.state = newState
c.start = now
}
c.lock.Unlock()
if now.IsZero() {
return
}
// reflect this change to slot mgr if defined (AKA hot)
if slots != nil {
slots.enterContainerState(newState)
slots.exitContainerState(oldState)
}
// update old state stats
gaugeKey := containerGaugeKeys[oldState]
if gaugeKey != "" {
common.DecrementGauge(ctx, gaugeKey)
}
timeKey := containerTimeKeys[oldState]
if timeKey != "" {
common.PublishElapsedTimeHistogram(ctx, timeKey, before, now)
}
// update new state stats
gaugeKey = containerGaugeKeys[newState]
if gaugeKey != "" {
common.IncrementGauge(ctx, gaugeKey)
}
}

View File

@@ -65,91 +65,98 @@ func (s *stats) getStatsForFunction(path string) *functionStats {
func (s *stats) Enqueue(ctx context.Context, app string, path string) { func (s *stats) Enqueue(ctx context.Context, app string, path string) {
s.mu.Lock() s.mu.Lock()
fstats := s.getStatsForFunction(path)
s.queue++ s.queue++
s.getStatsForFunction(path).queue++ fstats.queue++
common.IncrementGauge(ctx, queuedMetricName)
common.IncrementCounter(ctx, callsMetricName)
s.mu.Unlock() s.mu.Unlock()
common.IncrementGauge(ctx, queuedMetricName)
common.IncrementCounter(ctx, callsMetricName)
} }
// Call when a function has been queued but cannot be started because of an error // Call when a function has been queued but cannot be started because of an error
func (s *stats) Dequeue(ctx context.Context, app string, path string) { func (s *stats) Dequeue(ctx context.Context, app string, path string) {
s.mu.Lock() s.mu.Lock()
fstats := s.getStatsForFunction(path)
s.queue-- s.queue--
s.getStatsForFunction(path).queue-- fstats.queue--
common.DecrementGauge(ctx, queuedMetricName)
s.mu.Unlock() s.mu.Unlock()
common.DecrementGauge(ctx, queuedMetricName)
} }
func (s *stats) DequeueAndStart(ctx context.Context, app string, path string) { func (s *stats) DequeueAndStart(ctx context.Context, app string, path string) {
s.mu.Lock() s.mu.Lock()
fstats := s.getStatsForFunction(path)
s.queue-- s.queue--
s.getStatsForFunction(path).queue--
common.DecrementGauge(ctx, queuedMetricName)
s.running++ s.running++
s.getStatsForFunction(path).running++ fstats.queue--
common.IncrementGauge(ctx, runningMetricName) fstats.running++
s.mu.Unlock() s.mu.Unlock()
common.DecrementGauge(ctx, queuedMetricName)
common.IncrementGauge(ctx, runningMetricName)
} }
func (s *stats) Complete(ctx context.Context, app string, path string) { func (s *stats) Complete(ctx context.Context, app string, path string) {
s.mu.Lock() s.mu.Lock()
fstats := s.getStatsForFunction(path)
s.running-- s.running--
s.getStatsForFunction(path).running--
common.DecrementGauge(ctx, runningMetricName)
s.complete++ s.complete++
s.getStatsForFunction(path).complete++ fstats.running--
common.IncrementCounter(ctx, completedMetricName) fstats.complete++
s.mu.Unlock() s.mu.Unlock()
common.DecrementGauge(ctx, runningMetricName)
common.IncrementCounter(ctx, completedMetricName)
} }
func (s *stats) Failed(ctx context.Context, app string, path string) { func (s *stats) Failed(ctx context.Context, app string, path string) {
s.mu.Lock() s.mu.Lock()
fstats := s.getStatsForFunction(path)
s.running-- s.running--
s.getStatsForFunction(path).running--
common.DecrementGauge(ctx, runningMetricName)
s.failed++ s.failed++
s.getStatsForFunction(path).failed++ fstats.running--
common.IncrementCounter(ctx, failedMetricName) fstats.failed++
s.mu.Unlock() s.mu.Unlock()
common.DecrementGauge(ctx, runningMetricName)
common.IncrementCounter(ctx, failedMetricName)
} }
func (s *stats) DequeueAndFail(ctx context.Context, app string, path string) { func (s *stats) DequeueAndFail(ctx context.Context, app string, path string) {
s.mu.Lock() s.mu.Lock()
fstats := s.getStatsForFunction(path)
s.queue-- s.queue--
s.getStatsForFunction(path).queue--
common.DecrementGauge(ctx, queuedMetricName)
s.failed++ s.failed++
s.getStatsForFunction(path).failed++ fstats.queue--
common.IncrementCounter(ctx, failedMetricName) fstats.failed++
s.mu.Unlock() s.mu.Unlock()
common.DecrementGauge(ctx, queuedMetricName)
common.IncrementCounter(ctx, failedMetricName)
} }
func (s *stats) IncrementTimedout(ctx context.Context) { func IncrementTimedout(ctx context.Context) {
common.IncrementCounter(ctx, timedoutMetricName) common.IncrementCounter(ctx, timedoutMetricName)
} }
func (s *stats) IncrementErrors(ctx context.Context) { func IncrementErrors(ctx context.Context) {
common.IncrementCounter(ctx, errorsMetricName) common.IncrementCounter(ctx, errorsMetricName)
} }
func (s *stats) IncrementTooBusy(ctx context.Context) { func IncrementTooBusy(ctx context.Context) {
common.IncrementCounter(ctx, serverBusyMetricName) common.IncrementCounter(ctx, serverBusyMetricName)
} }

View File

@@ -2,6 +2,8 @@ package common
import ( import (
"context" "context"
"time"
"github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log" "github.com/opentracing/opentracing-go/log"
) )
@@ -106,6 +108,14 @@ func PublishHistogramToSpan(span opentracing.Span, key string, value float64) {
span.LogFields(log.Float64(fieldname, value)) span.LogFields(log.Float64(fieldname, value))
} }
// PublishElapsedTimeToSpan publishes the specifed histogram elapsed time since start
// It does this by logging an appropriate field value to a tracing span
// Use this when the current tracing span is long-lived and you want the metric to be visible before it ends
func PublishElapsedTimeHistogram(ctx context.Context, key string, start, end time.Time) {
elapsed := float64(end.Sub(start).Seconds())
PublishHistogram(ctx, key, elapsed)
}
const ( const (
// FnPrefix is a constant for "fn_", used as a prefix for span names, field names, Prometheus metric names and Prometheus label names // FnPrefix is a constant for "fn_", used as a prefix for span names, field names, Prometheus metric names and Prometheus label names