mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* add user syslog writers to app users may specify a syslog url[s] on apps now and all functions under that app will spew their logs out to it. the docs have more information around details there, please review those (swagger and operating/logging.md), tried to implement to spec in some parts and improve others, open to feedback on format though, lots of liberty there. design decision wise, I am looking to the future and ignoring cold containers. the overhead of the connections there will not be worth it, so this feature only works for hot functions, since we're killing cold anyway (even if a user can just straight up exit a hot container). syslog connections will be opened against a container when it starts up, and then the call id that is logged gets swapped out for each call that goes through the container, this cuts down on the cost of opening/closing connections significantly. there are buffers to accumulate logs until we get a `\n` to actually write a syslog line, and a buffer to save some bytes when we're writing the syslog formatting as well. underneath writers re-use the line writer in certain scenarios (swapper). we could likely improve the ease of setting this up, but opening the syslog conns against a container seems worth it, and is a different path than the other func loggers that we create when we make a call object. the Close() stuff is a little tricky, not sure how to make it easier and have the ^ benefits, open to idears. this does add another vector of 'limits' to consider for more strict service operators. one being how many syslog urls can a user add to an app (infinite, atm) and the other being on the order of number of containers per host we could run out of connections in certain scenarios. there may be some utility in having multiple syslog sinks to send to, it could help with debugging at times to send to another destination or if a user is a client w/ someone and both want the function logs, e.g. (have used this for that in the past, specifically). this also doesn't work behind a proxy, which is something i'm open to fixing, but afaict will require a 3rd party dependency (we can pretty much steal what docker does). this is mostly of utility for those of us that work behind a proxy all the time, not really for end users. there are some unit tests. integration tests for this don't sound very fun to maintain. I did test against papertrail with each protocol and it works (and even times out if you're behind a proxy!). closes #337 * add trace to syslog dial
371 lines
8.6 KiB
Go
371 lines
8.6 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha1"
|
|
"encoding/binary"
|
|
"hash"
|
|
"reflect"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"unsafe"
|
|
)
|
|
|
|
//
|
|
// slotQueueMgr keeps track of hot container slotQueues where each slotQueue
|
|
// provides for multiple consumers/producers. slotQueue also stores
|
|
// a few basic stats in slotStats.
|
|
//
|
|
|
|
type Slot interface {
|
|
exec(ctx context.Context, call *call) error
|
|
Close(ctx context.Context) error
|
|
Error() error
|
|
}
|
|
|
|
// slotQueueMgr manages hot container slotQueues
|
|
type slotQueueMgr struct {
|
|
hMu sync.Mutex // protects hot
|
|
hot map[string]*slotQueue
|
|
}
|
|
|
|
// request and container states
|
|
type slotQueueStats struct {
|
|
requestStates [RequestStateMax]uint64
|
|
containerStates [ContainerStateMax]uint64
|
|
}
|
|
|
|
type slotToken struct {
|
|
slot Slot
|
|
trigger chan struct{}
|
|
id uint64
|
|
isBusy uint32
|
|
}
|
|
|
|
// LIFO queue that exposes input/output channels along
|
|
// with runner/waiter tracking for agent
|
|
type slotQueue struct {
|
|
key string
|
|
cond *sync.Cond
|
|
slots []*slotToken
|
|
nextId uint64
|
|
signaller chan chan error
|
|
statsLock sync.Mutex // protects stats below
|
|
stats slotQueueStats
|
|
}
|
|
|
|
func NewSlotQueueMgr() *slotQueueMgr {
|
|
obj := &slotQueueMgr{
|
|
hot: make(map[string]*slotQueue),
|
|
}
|
|
return obj
|
|
}
|
|
|
|
func NewSlotQueue(key string) *slotQueue {
|
|
obj := &slotQueue{
|
|
key: key,
|
|
cond: sync.NewCond(new(sync.Mutex)),
|
|
slots: make([]*slotToken, 0),
|
|
signaller: make(chan chan error, 1),
|
|
}
|
|
|
|
return obj
|
|
}
|
|
|
|
func (a *slotQueue) acquireSlot(s *slotToken) bool {
|
|
// let's get the lock
|
|
if !atomic.CompareAndSwapUint32(&s.isBusy, 0, 1) {
|
|
return false
|
|
}
|
|
|
|
a.cond.L.Lock()
|
|
// common case: acquired slots are usually at the end
|
|
for i := len(a.slots) - 1; i >= 0; i-- {
|
|
if a.slots[i].id == s.id {
|
|
a.slots = append(a.slots[:i], a.slots[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
a.cond.L.Unlock()
|
|
|
|
// now we have the lock, push the trigger
|
|
close(s.trigger)
|
|
return true
|
|
}
|
|
|
|
func (a *slotQueue) startDequeuer(ctx context.Context) chan *slotToken {
|
|
|
|
isWaiting := false
|
|
output := make(chan *slotToken)
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
a.cond.L.Lock()
|
|
if isWaiting {
|
|
a.cond.Broadcast()
|
|
}
|
|
a.cond.L.Unlock()
|
|
}()
|
|
|
|
go func() {
|
|
for {
|
|
a.cond.L.Lock()
|
|
|
|
isWaiting = true
|
|
for len(a.slots) <= 0 && (ctx.Err() == nil) {
|
|
a.cond.Wait()
|
|
}
|
|
isWaiting = false
|
|
|
|
if ctx.Err() != nil {
|
|
a.cond.L.Unlock()
|
|
return
|
|
}
|
|
|
|
item := a.slots[len(a.slots)-1]
|
|
a.cond.L.Unlock()
|
|
|
|
select {
|
|
case output <- item: // good case (dequeued)
|
|
case <-item.trigger: // ejected (eject handles cleanup)
|
|
case <-ctx.Done(): // time out or cancel from caller
|
|
}
|
|
}
|
|
}()
|
|
|
|
return output
|
|
}
|
|
|
|
func (a *slotQueue) queueSlot(slot Slot) *slotToken {
|
|
|
|
token := &slotToken{slot, make(chan struct{}), 0, 0}
|
|
|
|
a.cond.L.Lock()
|
|
token.id = a.nextId
|
|
a.slots = append(a.slots, token)
|
|
a.nextId += 1
|
|
a.cond.L.Unlock()
|
|
|
|
a.cond.Broadcast()
|
|
return token
|
|
}
|
|
|
|
// isIdle() returns true is there's no activity for this slot queue. This
|
|
// means no one is waiting, running or starting.
|
|
func (a *slotQueue) isIdle() bool {
|
|
var isIdle bool
|
|
|
|
a.statsLock.Lock()
|
|
|
|
isIdle = a.stats.requestStates[RequestStateWait] == 0 &&
|
|
a.stats.requestStates[RequestStateExec] == 0 &&
|
|
a.stats.containerStates[ContainerStateWait] == 0 &&
|
|
a.stats.containerStates[ContainerStateStart] == 0 &&
|
|
a.stats.containerStates[ContainerStateIdle] == 0 &&
|
|
a.stats.containerStates[ContainerStateBusy] == 0
|
|
|
|
a.statsLock.Unlock()
|
|
|
|
return isIdle
|
|
}
|
|
|
|
func (a *slotQueue) getStats() slotQueueStats {
|
|
var out slotQueueStats
|
|
a.statsLock.Lock()
|
|
out = a.stats
|
|
a.statsLock.Unlock()
|
|
return out
|
|
}
|
|
|
|
func isNewContainerNeeded(cur *slotQueueStats) bool {
|
|
|
|
idleWorkers := cur.containerStates[ContainerStateIdle]
|
|
starters := cur.containerStates[ContainerStateStart]
|
|
startWaiters := cur.containerStates[ContainerStateWait]
|
|
|
|
queuedRequests := cur.requestStates[RequestStateWait]
|
|
|
|
// we expect idle containers to immediately pick up
|
|
// any waiters. We assume non-idle containers busy.
|
|
effectiveWaiters := uint64(0)
|
|
if idleWorkers < queuedRequests {
|
|
effectiveWaiters = queuedRequests - idleWorkers
|
|
}
|
|
|
|
if effectiveWaiters == 0 {
|
|
return false
|
|
}
|
|
|
|
// we expect resource waiters to eventually transition
|
|
// into starters.
|
|
effectiveStarters := starters + startWaiters
|
|
|
|
// if containers are starting, do not start more than effective waiters
|
|
if effectiveStarters > 0 && effectiveStarters >= effectiveWaiters {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (a *slotQueue) enterRequestState(reqType RequestStateType) {
|
|
if reqType > RequestStateNone && reqType < RequestStateMax {
|
|
a.statsLock.Lock()
|
|
a.stats.requestStates[reqType] += 1
|
|
a.statsLock.Unlock()
|
|
}
|
|
}
|
|
|
|
func (a *slotQueue) exitRequestState(reqType RequestStateType) {
|
|
if reqType > RequestStateNone && reqType < RequestStateMax {
|
|
a.statsLock.Lock()
|
|
a.stats.requestStates[reqType] -= 1
|
|
a.statsLock.Unlock()
|
|
}
|
|
}
|
|
|
|
func (a *slotQueue) enterContainerState(conType ContainerStateType) {
|
|
if conType > ContainerStateNone && conType < ContainerStateMax {
|
|
a.statsLock.Lock()
|
|
a.stats.containerStates[conType] += 1
|
|
a.statsLock.Unlock()
|
|
}
|
|
}
|
|
|
|
func (a *slotQueue) exitContainerState(conType ContainerStateType) {
|
|
if conType > ContainerStateNone && conType < ContainerStateMax {
|
|
a.statsLock.Lock()
|
|
a.stats.containerStates[conType] -= 1
|
|
a.statsLock.Unlock()
|
|
}
|
|
}
|
|
|
|
// getSlot must ensure that if it receives a slot, it will be returned, otherwise
|
|
// a container will be locked up forever waiting for slot to free.
|
|
func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) {
|
|
|
|
key := getSlotQueueKey(call)
|
|
|
|
a.hMu.Lock()
|
|
slots, ok := a.hot[key]
|
|
if !ok {
|
|
slots = NewSlotQueue(key)
|
|
a.hot[key] = slots
|
|
}
|
|
a.hMu.Unlock()
|
|
|
|
return slots, !ok
|
|
}
|
|
|
|
// currently unused. But at some point, we need to age/delete old
|
|
// slotQueues.
|
|
func (a *slotQueueMgr) deleteSlotQueue(slots *slotQueue) bool {
|
|
isDeleted := false
|
|
|
|
a.hMu.Lock()
|
|
if slots.isIdle() {
|
|
delete(a.hot, slots.key)
|
|
isDeleted = true
|
|
}
|
|
a.hMu.Unlock()
|
|
|
|
return isDeleted
|
|
}
|
|
|
|
var shapool = &sync.Pool{New: func() interface{} { return sha1.New() }}
|
|
|
|
// TODO do better; once we have app+route versions this function
|
|
// can be simply app+route names & version
|
|
func getSlotQueueKey(call *call) string {
|
|
// return a sha1 hash of a (hopefully) unique string of all the config
|
|
// values, to make map lookups quicker [than the giant unique string]
|
|
|
|
hash := shapool.Get().(hash.Hash)
|
|
hash.Reset()
|
|
defer shapool.Put(hash)
|
|
|
|
hash.Write(unsafeBytes(call.AppID))
|
|
hash.Write(unsafeBytes("\x00"))
|
|
hash.Write(unsafeBytes(call.SyslogURL))
|
|
hash.Write(unsafeBytes("\x00"))
|
|
hash.Write(unsafeBytes(call.Path))
|
|
hash.Write(unsafeBytes("\x00"))
|
|
hash.Write(unsafeBytes(call.Image))
|
|
hash.Write(unsafeBytes("\x00"))
|
|
hash.Write(unsafeBytes(call.Format))
|
|
hash.Write(unsafeBytes("\x00"))
|
|
|
|
// these are all static in size we only need to delimit the whole block of them
|
|
var byt [8]byte
|
|
binary.LittleEndian.PutUint32(byt[:4], uint32(call.Timeout))
|
|
hash.Write(byt[:4])
|
|
|
|
binary.LittleEndian.PutUint32(byt[:4], uint32(call.IdleTimeout))
|
|
hash.Write(byt[:4])
|
|
|
|
binary.LittleEndian.PutUint64(byt[:], call.Memory)
|
|
hash.Write(byt[:])
|
|
|
|
binary.LittleEndian.PutUint64(byt[:], uint64(call.CPUs))
|
|
hash.Write(byt[:])
|
|
hash.Write(unsafeBytes("\x00"))
|
|
|
|
// we have to sort these before printing, yay.
|
|
// TODO if we had a max size for config const we could avoid this!
|
|
keys := make([]string, 0, len(call.Config))
|
|
for k := range call.Config {
|
|
i := sort.SearchStrings(keys, k)
|
|
keys = append(keys, "")
|
|
copy(keys[i+1:], keys[i:])
|
|
keys[i] = k
|
|
}
|
|
|
|
for _, k := range keys {
|
|
hash.Write(unsafeBytes(k))
|
|
hash.Write(unsafeBytes("\x00"))
|
|
hash.Write(unsafeBytes(call.Config[k]))
|
|
hash.Write(unsafeBytes("\x00"))
|
|
}
|
|
|
|
// we need to additionally delimit config and annotations to eliminate overlap bug
|
|
hash.Write(unsafeBytes("\x00"))
|
|
|
|
keys = keys[:0] // clear keys
|
|
for k := range call.Annotations {
|
|
i := sort.SearchStrings(keys, k)
|
|
keys = append(keys, "")
|
|
copy(keys[i+1:], keys[i:])
|
|
keys[i] = k
|
|
}
|
|
|
|
for _, k := range keys {
|
|
hash.Write(unsafeBytes(k))
|
|
hash.Write(unsafeBytes("\x00"))
|
|
v, _ := call.Annotations.Get(k)
|
|
hash.Write(v)
|
|
hash.Write(unsafeBytes("\x00"))
|
|
}
|
|
|
|
var buf [sha1.Size]byte
|
|
hash.Sum(buf[:0])
|
|
return string(buf[:])
|
|
}
|
|
|
|
// WARN: this is read only
|
|
func unsafeBytes(a string) []byte {
|
|
strHeader := (*reflect.StringHeader)(unsafe.Pointer(&a))
|
|
|
|
var b []byte
|
|
byteHeader := (*reflect.SliceHeader)(unsafe.Pointer(&b))
|
|
byteHeader.Data = strHeader.Data
|
|
|
|
// need to take the length of `a` here to ensure it's alive until after we update b's Data
|
|
// field since the garbage collector can collect a variable once it is no longer used
|
|
// not when it goes out of scope, for more details see https://github.com/golang/go/issues/9046
|
|
l := len(a)
|
|
byteHeader.Len = l
|
|
byteHeader.Cap = l
|
|
return b
|
|
}
|