mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Latency stats are not always read-time updated and if calls are stuck in waiting state, isNewContainerNeeded() needs to be a bit more aggresive if the wait queue grows.
354 lines
8.2 KiB
Go
354 lines
8.2 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha1"
|
|
"fmt"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
//
|
|
// slotQueueMgr keeps track of hot container slotQueues where each slotQueue
|
|
// provides for multiple consumers/producers. slotQueue also stores
|
|
// a few basic stats in slotStats.
|
|
//
|
|
|
|
type Slot interface {
|
|
exec(ctx context.Context, call *call) error
|
|
Close() error
|
|
Error() error
|
|
}
|
|
|
|
// slotQueueMgr manages hot container slotQueues
|
|
type slotQueueMgr struct {
|
|
hMu sync.Mutex // protects hot
|
|
hot map[string]*slotQueue
|
|
}
|
|
|
|
type SlotQueueMetricType int
|
|
|
|
const (
|
|
SlotQueueRunner SlotQueueMetricType = iota
|
|
SlotQueueStarter
|
|
SlotQueueWaiter
|
|
SlotQueueLast
|
|
)
|
|
|
|
// counters per state and moving avg of time spent in each state
|
|
type slotQueueStats struct {
|
|
states [SlotQueueLast]uint64
|
|
latencyCount [SlotQueueLast]uint64
|
|
latencies [SlotQueueLast]uint64
|
|
}
|
|
|
|
type slotToken struct {
|
|
slot Slot
|
|
trigger chan struct{}
|
|
id uint64
|
|
isBusy uint32
|
|
}
|
|
|
|
// LIFO queue that exposes input/output channels along
|
|
// with runner/waiter tracking for agent
|
|
type slotQueue struct {
|
|
key string
|
|
cond *sync.Cond
|
|
slots []*slotToken
|
|
nextId uint64
|
|
signaller chan bool
|
|
statsLock sync.Mutex // protects stats below
|
|
stats slotQueueStats
|
|
}
|
|
|
|
func NewSlotQueueMgr() *slotQueueMgr {
|
|
obj := &slotQueueMgr{
|
|
hot: make(map[string]*slotQueue),
|
|
}
|
|
return obj
|
|
}
|
|
|
|
func NewSlotQueue(key string) *slotQueue {
|
|
obj := &slotQueue{
|
|
key: key,
|
|
cond: sync.NewCond(new(sync.Mutex)),
|
|
slots: make([]*slotToken, 0),
|
|
signaller: make(chan bool, 1),
|
|
}
|
|
|
|
return obj
|
|
}
|
|
|
|
func (a *slotToken) acquireSlot() bool {
|
|
// let's get the lock
|
|
if !atomic.CompareAndSwapUint32(&a.isBusy, 0, 1) {
|
|
return false
|
|
}
|
|
|
|
// now we have the lock, push the trigger
|
|
close(a.trigger)
|
|
return true
|
|
}
|
|
|
|
func (a *slotQueue) ejectSlot(s *slotToken) bool {
|
|
// let's get the lock
|
|
if !atomic.CompareAndSwapUint32(&s.isBusy, 0, 1) {
|
|
return false
|
|
}
|
|
|
|
a.cond.L.Lock()
|
|
for i := 0; i < len(a.slots); i++ {
|
|
if a.slots[i].id == s.id {
|
|
a.slots = append(a.slots[:i], a.slots[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
a.cond.L.Unlock()
|
|
|
|
s.slot.Close()
|
|
// now we have the lock, push the trigger
|
|
close(s.trigger)
|
|
return true
|
|
}
|
|
|
|
func (a *slotQueue) startDequeuer(ctx context.Context) (chan *slotToken, context.CancelFunc) {
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
myCancel := func() {
|
|
cancel()
|
|
a.cond.Broadcast()
|
|
}
|
|
|
|
output := make(chan *slotToken)
|
|
|
|
go func() {
|
|
for {
|
|
a.cond.L.Lock()
|
|
for len(a.slots) <= 0 && (ctx.Err() == nil) {
|
|
a.cond.Wait()
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
a.cond.L.Unlock()
|
|
return
|
|
}
|
|
|
|
// pop
|
|
item := a.slots[len(a.slots)-1]
|
|
a.slots = a.slots[:len(a.slots)-1]
|
|
a.cond.L.Unlock()
|
|
|
|
select {
|
|
case output <- item: // good case (dequeued)
|
|
case <-item.trigger: // ejected (eject handles cleanup)
|
|
case <-ctx.Done(): // time out or cancel from caller
|
|
// consume slot, we let the hot container queue the slot again
|
|
if item.acquireSlot() {
|
|
item.slot.Close()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return output, myCancel
|
|
}
|
|
|
|
func (a *slotQueue) queueSlot(slot Slot) *slotToken {
|
|
|
|
token := &slotToken{slot, make(chan struct{}), 0, 0}
|
|
|
|
a.cond.L.Lock()
|
|
token.id = a.nextId
|
|
a.slots = append(a.slots, token)
|
|
a.nextId += 1
|
|
a.cond.L.Unlock()
|
|
|
|
a.cond.Broadcast()
|
|
return token
|
|
}
|
|
|
|
// isIdle() returns true is there's no activity for this slot queue. This
|
|
// means no one is waiting, running or starting.
|
|
func (a *slotQueue) isIdle() bool {
|
|
var partySize uint64
|
|
|
|
a.statsLock.Lock()
|
|
partySize = a.stats.states[SlotQueueWaiter] + a.stats.states[SlotQueueStarter] + a.stats.states[SlotQueueRunner]
|
|
a.statsLock.Unlock()
|
|
|
|
return partySize == 0
|
|
}
|
|
|
|
func (a *slotQueue) getStats() slotQueueStats {
|
|
var out slotQueueStats
|
|
a.statsLock.Lock()
|
|
out = a.stats
|
|
a.statsLock.Unlock()
|
|
return out
|
|
}
|
|
|
|
func (a *slotQueue) isNewContainerNeeded() (bool, slotQueueStats) {
|
|
|
|
stats := a.getStats()
|
|
|
|
waiters := stats.states[SlotQueueWaiter]
|
|
if waiters == 0 {
|
|
return false, stats
|
|
}
|
|
|
|
// while a container is starting, do not start more than waiters
|
|
starters := stats.states[SlotQueueStarter]
|
|
if starters >= waiters {
|
|
return false, stats
|
|
}
|
|
|
|
// this is a bit aggresive and assumes that we only
|
|
// want to queue as much as num of containers.
|
|
executors := starters + stats.states[SlotQueueRunner]
|
|
if executors < waiters {
|
|
return true, stats
|
|
}
|
|
|
|
// WARNING: Below is a few heuristics that are
|
|
// speculative, which may (and will) likely need
|
|
// adjustments.
|
|
|
|
// WARNING: latencies below are updated after a call
|
|
// switches to/from different states. Do not assume
|
|
// the metrics below are always up-to-date. For example,
|
|
// a sudden burst of incoming requests will increase
|
|
// waiter count but not necessarily wait latency until
|
|
// those requests switch from waiter state.
|
|
|
|
runLat := stats.latencies[SlotQueueRunner]
|
|
waitLat := stats.latencies[SlotQueueWaiter]
|
|
startLat := stats.latencies[SlotQueueStarter]
|
|
|
|
// no wait latency? No need to spin up new container
|
|
if waitLat == 0 {
|
|
return false, stats
|
|
}
|
|
|
|
// this determines the aggresiveness of the container launch.
|
|
if runLat/executors*2 < waitLat {
|
|
return true, stats
|
|
}
|
|
if runLat < waitLat {
|
|
return true, stats
|
|
}
|
|
if startLat < waitLat {
|
|
return true, stats
|
|
}
|
|
|
|
return false, stats
|
|
}
|
|
|
|
func (a *slotQueue) enterState(metricIdx SlotQueueMetricType) {
|
|
a.statsLock.Lock()
|
|
a.stats.states[metricIdx] += 1
|
|
a.statsLock.Unlock()
|
|
}
|
|
|
|
func (a *slotQueue) exitState(metricIdx SlotQueueMetricType) {
|
|
a.statsLock.Lock()
|
|
if a.stats.states[metricIdx] == 0 {
|
|
panic(fmt.Sprintf("BUG: metric tracking fault idx=%v", metricIdx))
|
|
}
|
|
a.stats.states[metricIdx] -= 1
|
|
a.statsLock.Unlock()
|
|
}
|
|
|
|
func (a *slotQueue) recordLatencyLocked(metricIdx SlotQueueMetricType, latency uint64) {
|
|
// exponentially weighted moving average with smoothing factor of 0.5
|
|
// 0.5 is a high value to age older observations fast while filtering
|
|
// some noise. For our purposes, newer observations are much more important
|
|
// than older, but we still would like to low pass some noise.
|
|
// first samples are ignored.
|
|
if a.stats.latencyCount[metricIdx] != 0 {
|
|
a.stats.latencies[metricIdx] = (a.stats.latencies[metricIdx]*5 + latency*5) / 10
|
|
}
|
|
a.stats.latencyCount[metricIdx] += 1
|
|
if a.stats.latencyCount[metricIdx] == 0 {
|
|
a.stats.latencyCount[metricIdx] += 1
|
|
}
|
|
}
|
|
|
|
func (a *slotQueue) recordLatency(metricIdx SlotQueueMetricType, latency uint64) {
|
|
a.statsLock.Lock()
|
|
a.recordLatencyLocked(metricIdx, latency)
|
|
a.statsLock.Unlock()
|
|
}
|
|
|
|
func (a *slotQueue) exitStateWithLatency(metricIdx SlotQueueMetricType, latency uint64) {
|
|
a.statsLock.Lock()
|
|
if a.stats.states[metricIdx] == 0 {
|
|
panic(fmt.Sprintf("BUG: metric tracking fault idx=%v", metricIdx))
|
|
}
|
|
a.stats.states[metricIdx] -= 1
|
|
a.recordLatencyLocked(metricIdx, latency)
|
|
a.statsLock.Unlock()
|
|
}
|
|
|
|
// getSlot must ensure that if it receives a slot, it will be returned, otherwise
|
|
// a container will be locked up forever waiting for slot to free.
|
|
func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) {
|
|
|
|
key := getSlotQueueKey(call)
|
|
|
|
a.hMu.Lock()
|
|
slots, ok := a.hot[key]
|
|
if !ok {
|
|
slots = NewSlotQueue(key)
|
|
a.hot[key] = slots
|
|
}
|
|
slots.enterState(SlotQueueWaiter)
|
|
a.hMu.Unlock()
|
|
|
|
return slots, !ok
|
|
}
|
|
|
|
// currently unused. But at some point, we need to age/delete old
|
|
// slotQueues.
|
|
func (a *slotQueueMgr) deleteSlotQueue(slots *slotQueue) bool {
|
|
isDeleted := false
|
|
|
|
a.hMu.Lock()
|
|
if slots.isIdle() {
|
|
delete(a.hot, slots.key)
|
|
isDeleted = true
|
|
}
|
|
a.hMu.Unlock()
|
|
|
|
return isDeleted
|
|
}
|
|
|
|
func getSlotQueueKey(call *call) string {
|
|
// return a sha1 hash of a (hopefully) unique string of all the config
|
|
// values, to make map lookups quicker [than the giant unique string]
|
|
|
|
hash := sha1.New()
|
|
fmt.Fprint(hash, call.AppName, "\x00")
|
|
fmt.Fprint(hash, call.Path, "\x00")
|
|
fmt.Fprint(hash, call.Image, "\x00")
|
|
fmt.Fprint(hash, call.Timeout, "\x00")
|
|
fmt.Fprint(hash, call.IdleTimeout, "\x00")
|
|
fmt.Fprint(hash, call.Memory, "\x00")
|
|
fmt.Fprint(hash, call.Format, "\x00")
|
|
|
|
// we have to sort these before printing, yay. TODO do better
|
|
keys := make([]string, 0, len(call.Config))
|
|
for k := range call.Config {
|
|
keys = append(keys, k)
|
|
}
|
|
|
|
sort.Strings(keys)
|
|
for _, k := range keys {
|
|
fmt.Fprint(hash, k, "\x00", call.Config[k], "\x00")
|
|
}
|
|
|
|
var buf [sha1.Size]byte
|
|
return string(hash.Sum(buf[:0]))
|
|
}
|