fn: introduce agent config and minor ghostreader tweak (#797)

* fn: introduce agent config and minor ghostreader tweak

TODO: move all constants/tweaks in agent to agent config.

* fn: json convention
This commit is contained in:
Tolga Ceylan
2018-02-27 12:17:13 -08:00
committed by GitHub
parent 46fad7ef80
commit 320b766a6d
2 changed files with 84 additions and 60 deletions

View File

@@ -3,10 +3,7 @@ package agent
import ( import (
"context" "context"
"io" "io"
"math"
"net/http" "net/http"
"os"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -98,6 +95,7 @@ type Agent interface {
} }
type agent struct { type agent struct {
cfg AgentConfig
da DataAccess da DataAccess
callListeners []fnext.CallListener callListeners []fnext.CallListener
@@ -112,43 +110,31 @@ type agent struct {
shutonce sync.Once shutonce sync.Once
shutdown chan struct{} shutdown chan struct{}
freezeIdleMsecs time.Duration
ejectIdleMsecs time.Duration
// Prometheus HTTP handler // Prometheus HTTP handler
promHandler http.Handler promHandler http.Handler
} }
func New(da DataAccess) Agent { func New(da DataAccess) Agent {
cfg, err := NewAgentConfig()
if err != nil {
logrus.WithField("cfg", cfg).WithError(err).Fatal("error in agent config")
}
logrus.WithField("cfg", cfg).Info("agent starting")
// TODO: Create drivers.New(runnerConfig) // TODO: Create drivers.New(runnerConfig)
driver := docker.NewDocker(drivers.Config{ driver := docker.NewDocker(drivers.Config{
ServerVersion: "17.06.0-ce", ServerVersion: "17.06.0-ce",
}) })
freezeIdleMsecs, err := getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond)
if err != nil {
logrus.WithError(err).Fatal("error initializing freeze idle delay")
}
ejectIdleMsecs, err := getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond)
if err != nil {
logrus.WithError(err).Fatal("error initializing eject idle delay")
}
if ejectIdleMsecs == time.Duration(0) {
logrus.Fatal("eject idle delay cannot be zero")
}
logrus.WithFields(logrus.Fields{"eject_msec": ejectIdleMsecs, "free_msec": freezeIdleMsecs}).Info("agent starting")
a := &agent{ a := &agent{
da: da, cfg: *cfg,
driver: driver, da: da,
slotMgr: NewSlotQueueMgr(), driver: driver,
resources: NewResourceTracker(), slotMgr: NewSlotQueueMgr(),
shutdown: make(chan struct{}), resources: NewResourceTracker(),
freezeIdleMsecs: freezeIdleMsecs, shutdown: make(chan struct{}),
ejectIdleMsecs: ejectIdleMsecs, promHandler: promhttp.Handler(),
promHandler: promhttp.Handler(),
} }
// TODO assert that agent doesn't get started for API nodes up above ? // TODO assert that agent doesn't get started for API nodes up above ?
@@ -158,26 +144,6 @@ func New(da DataAccess) Agent {
return a return a
} }
func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) {
delay := defaultVal
if dur := os.Getenv(name); dur != "" {
durInt, err := strconv.ParseInt(dur, 10, 64)
if err != nil {
return defaultVal, err
}
// disable if negative or set to msecs specified.
if durInt < 0 || time.Duration(durInt) >= math.MaxInt64/time.Millisecond {
delay = math.MaxInt64
} else {
delay = time.Duration(durInt) * time.Millisecond
}
}
return delay, nil
}
// TODO shuffle this around somewhere else (maybe) // TODO shuffle this around somewhere else (maybe)
func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { func (a *agent) Enqueue(ctx context.Context, call *models.Call) error {
return a.da.Enqueue(ctx, call) return a.da.Enqueue(ctx, call)
@@ -747,9 +713,9 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
var err error var err error
isFrozen := false isFrozen := false
freezeTimer := time.NewTimer(a.freezeIdleMsecs) freezeTimer := time.NewTimer(a.cfg.FreezeIdleMsecs)
idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second) idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second)
ejectTicker := time.NewTicker(a.ejectIdleMsecs) ejectTicker := time.NewTicker(a.cfg.EjectIdleMsecs)
defer freezeTimer.Stop() defer freezeTimer.Stop()
defer idleTimer.Stop() defer idleTimer.Stop()
@@ -763,7 +729,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
}() }()
// if an immediate freeze is requested, freeze first before enqueuing at all. // if an immediate freeze is requested, freeze first before enqueuing at all.
if a.freezeIdleMsecs == time.Duration(0) && !isFrozen { if a.cfg.FreezeIdleMsecs == time.Duration(0) && !isFrozen {
err = cookie.Freeze(ctx) err = cookie.Freeze(ctx)
if err != nil { if err != nil {
return false return false
@@ -846,14 +812,10 @@ type container struct {
} }
func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() { func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() {
ostdin := c.stdin.(*ghostReader).inner
ostdout := c.stdout.(*ghostWriter).inner
ostderr := c.stderr.(*ghostWriter).inner
// if tests don't catch this, then fuck me // if tests don't catch this, then fuck me
c.stdin.(*ghostReader).swap(stdin) ostdin := c.stdin.(*ghostReader).swap(stdin)
c.stdout.(*ghostWriter).swap(stdout) ostdout := c.stdout.(*ghostWriter).swap(stdout)
c.stderr.(*ghostWriter).swap(stderr) ostderr := c.stderr.(*ghostWriter).swap(stderr)
c.statsMu.Lock() c.statsMu.Lock()
ocs := c.stats ocs := c.stats
@@ -947,11 +909,13 @@ type ghostReader struct {
closed bool closed bool
} }
func (g *ghostReader) swap(r io.Reader) { func (g *ghostReader) swap(r io.Reader) (old io.Reader) {
g.cond.L.Lock() g.cond.L.Lock()
old = g.inner
g.inner = r g.inner = r
g.cond.L.Unlock() g.cond.L.Unlock()
g.cond.Broadcast() g.cond.Broadcast()
return old
} }
func (g *ghostReader) Close() { func (g *ghostReader) Close() {

60
api/agent/config.go Normal file
View File

@@ -0,0 +1,60 @@
package agent
import (
"errors"
"math"
"os"
"strconv"
"time"
)
type AgentConfig struct {
MinDockerVersion string `json:"min_docker_version"`
FreezeIdleMsecs time.Duration `json:"freeze_idle_msecs"`
EjectIdleMsecs time.Duration `json:"eject_idle_msecs"`
}
func NewAgentConfig() (*AgentConfig, error) {
var err error
cfg := &AgentConfig{
MinDockerVersion: "17.06.0-ce",
}
cfg.FreezeIdleMsecs, err = getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond)
if err != nil {
return cfg, errors.New("error initializing freeze idle delay")
}
cfg.EjectIdleMsecs, err = getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond)
if err != nil {
return cfg, errors.New("error initializing eject idle delay")
}
if cfg.EjectIdleMsecs == time.Duration(0) {
return cfg, errors.New("error eject idle delay cannot be zero")
}
return cfg, nil
}
func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) {
delay := defaultVal
if dur := os.Getenv(name); dur != "" {
durInt, err := strconv.ParseInt(dur, 10, 64)
if err != nil {
return defaultVal, err
}
// disable if negative or set to msecs specified.
if durInt < 0 || time.Duration(durInt) >= math.MaxInt64/time.Millisecond {
delay = math.MaxInt64
} else {
delay = time.Duration(durInt) * time.Millisecond
}
}
return delay, nil
}