diff --git a/api/agent/agent.go b/api/agent/agent.go index 4c950f8af..25f0143ed 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -3,10 +3,7 @@ package agent import ( "context" "io" - "math" "net/http" - "os" - "strconv" "strings" "sync" "time" @@ -98,6 +95,7 @@ type Agent interface { } type agent struct { + cfg AgentConfig da DataAccess callListeners []fnext.CallListener @@ -112,43 +110,31 @@ type agent struct { shutonce sync.Once shutdown chan struct{} - freezeIdleMsecs time.Duration - ejectIdleMsecs time.Duration - // Prometheus HTTP handler promHandler http.Handler } func New(da DataAccess) Agent { + + cfg, err := NewAgentConfig() + if err != nil { + logrus.WithField("cfg", cfg).WithError(err).Fatal("error in agent config") + } + logrus.WithField("cfg", cfg).Info("agent starting") + // TODO: Create drivers.New(runnerConfig) driver := docker.NewDocker(drivers.Config{ ServerVersion: "17.06.0-ce", }) - freezeIdleMsecs, err := getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond) - if err != nil { - logrus.WithError(err).Fatal("error initializing freeze idle delay") - } - - ejectIdleMsecs, err := getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond) - if err != nil { - logrus.WithError(err).Fatal("error initializing eject idle delay") - } - if ejectIdleMsecs == time.Duration(0) { - logrus.Fatal("eject idle delay cannot be zero") - } - - logrus.WithFields(logrus.Fields{"eject_msec": ejectIdleMsecs, "free_msec": freezeIdleMsecs}).Info("agent starting") - a := &agent{ - da: da, - driver: driver, - slotMgr: NewSlotQueueMgr(), - resources: NewResourceTracker(), - shutdown: make(chan struct{}), - freezeIdleMsecs: freezeIdleMsecs, - ejectIdleMsecs: ejectIdleMsecs, - promHandler: promhttp.Handler(), + cfg: *cfg, + da: da, + driver: driver, + slotMgr: NewSlotQueueMgr(), + resources: NewResourceTracker(), + shutdown: make(chan struct{}), + promHandler: promhttp.Handler(), } // TODO assert that agent doesn't get started for API nodes up above ? @@ -158,26 +144,6 @@ func New(da DataAccess) Agent { return a } -func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) { - - delay := defaultVal - - if dur := os.Getenv(name); dur != "" { - durInt, err := strconv.ParseInt(dur, 10, 64) - if err != nil { - return defaultVal, err - } - // disable if negative or set to msecs specified. - if durInt < 0 || time.Duration(durInt) >= math.MaxInt64/time.Millisecond { - delay = math.MaxInt64 - } else { - delay = time.Duration(durInt) * time.Millisecond - } - } - - return delay, nil -} - // TODO shuffle this around somewhere else (maybe) func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { return a.da.Enqueue(ctx, call) @@ -747,9 +713,9 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, var err error isFrozen := false - freezeTimer := time.NewTimer(a.freezeIdleMsecs) + freezeTimer := time.NewTimer(a.cfg.FreezeIdleMsecs) idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second) - ejectTicker := time.NewTicker(a.ejectIdleMsecs) + ejectTicker := time.NewTicker(a.cfg.EjectIdleMsecs) defer freezeTimer.Stop() defer idleTimer.Stop() @@ -763,7 +729,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, }() // if an immediate freeze is requested, freeze first before enqueuing at all. - if a.freezeIdleMsecs == time.Duration(0) && !isFrozen { + if a.cfg.FreezeIdleMsecs == time.Duration(0) && !isFrozen { err = cookie.Freeze(ctx) if err != nil { return false @@ -846,14 +812,10 @@ type container struct { } func (c *container) swap(stdin io.Reader, stdout, stderr io.Writer, cs *drivers.Stats) func() { - ostdin := c.stdin.(*ghostReader).inner - ostdout := c.stdout.(*ghostWriter).inner - ostderr := c.stderr.(*ghostWriter).inner - // if tests don't catch this, then fuck me - c.stdin.(*ghostReader).swap(stdin) - c.stdout.(*ghostWriter).swap(stdout) - c.stderr.(*ghostWriter).swap(stderr) + ostdin := c.stdin.(*ghostReader).swap(stdin) + ostdout := c.stdout.(*ghostWriter).swap(stdout) + ostderr := c.stderr.(*ghostWriter).swap(stderr) c.statsMu.Lock() ocs := c.stats @@ -947,11 +909,13 @@ type ghostReader struct { closed bool } -func (g *ghostReader) swap(r io.Reader) { +func (g *ghostReader) swap(r io.Reader) (old io.Reader) { g.cond.L.Lock() + old = g.inner g.inner = r g.cond.L.Unlock() g.cond.Broadcast() + return old } func (g *ghostReader) Close() { diff --git a/api/agent/config.go b/api/agent/config.go new file mode 100644 index 000000000..f6e0f17e3 --- /dev/null +++ b/api/agent/config.go @@ -0,0 +1,60 @@ +package agent + +import ( + "errors" + "math" + "os" + "strconv" + "time" +) + +type AgentConfig struct { + MinDockerVersion string `json:"min_docker_version"` + FreezeIdleMsecs time.Duration `json:"freeze_idle_msecs"` + EjectIdleMsecs time.Duration `json:"eject_idle_msecs"` +} + +func NewAgentConfig() (*AgentConfig, error) { + + var err error + + cfg := &AgentConfig{ + MinDockerVersion: "17.06.0-ce", + } + + cfg.FreezeIdleMsecs, err = getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond) + if err != nil { + return cfg, errors.New("error initializing freeze idle delay") + } + + cfg.EjectIdleMsecs, err = getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond) + if err != nil { + return cfg, errors.New("error initializing eject idle delay") + } + + if cfg.EjectIdleMsecs == time.Duration(0) { + return cfg, errors.New("error eject idle delay cannot be zero") + } + + return cfg, nil +} + +func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) { + + delay := defaultVal + + if dur := os.Getenv(name); dur != "" { + durInt, err := strconv.ParseInt(dur, 10, 64) + if err != nil { + return defaultVal, err + } + // disable if negative or set to msecs specified. + if durInt < 0 || time.Duration(durInt) >= math.MaxInt64/time.Millisecond { + delay = math.MaxInt64 + } else { + delay = time.Duration(durInt) * time.Millisecond + } + } + + return delay, nil +}