From 74a51f3f88180020643daa43951f7ede3e5caf4d Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Tue, 13 Mar 2018 18:38:47 -0700 Subject: [PATCH] fn: reorg agent config (#853) * fn: reorg agent config *) Moving constants in agent to agent config, which helps with testing, tuning. *) Added max total cpu & memory for testing & clamping max mem & cpu usage if needed. * fn: adjust PipeIO time * fn: for hot, cannot reliably test EndOfLogs in TestRouteRunnerExecution --- api/agent/agent.go | 18 +++---- api/agent/async.go | 2 +- api/agent/config.go | 105 ++++++++++++++++++++++--------------- api/agent/pure_runner.go | 2 +- api/agent/resource.go | 24 ++++++--- api/agent/resource_test.go | 6 +-- api/server/runner_test.go | 16 +++--- 7 files changed, 105 insertions(+), 68 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index feca2f112..3464cb9ab 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -135,7 +135,7 @@ func NewSyncOnly(da DataAccess) Agent { da: da, driver: driver, slotMgr: NewSlotQueueMgr(), - resources: NewResourceTracker(), + resources: NewResourceTracker(cfg), shutdown: make(chan struct{}), } @@ -305,7 +305,7 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) { // Let use 60 minutes or 2 * IdleTimeout as hot queue idle timeout, pick // whichever is longer. If in this time, there's no activity, then // we destroy the hot queue. - timeout := time.Duration(60) * time.Minute + timeout := a.cfg.HotLauncherTimeout idleTimeout := time.Duration(call.IdleTimeout) * time.Second * 2 if timeout < idleTimeout { timeout = idleTimeout @@ -380,7 +380,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { ch := call.slots.startDequeuer(ctx) // 1) if we can get a slot immediately, grab it. - // 2) if we don't, send a signaller every 200ms until we do. + // 2) if we don't, send a signaller every x msecs until we do. sleep := 1 * time.Microsecond // pad, so time.After doesn't send immediately for { @@ -402,8 +402,8 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { // ping dequeuer again } - // set sleep to 200ms after first iteration - sleep = 200 * time.Millisecond + // set sleep to x msecs after first iteration + sleep = a.cfg.HotPoll // send a notification to launchHot() select { case call.slots.signaller <- true: @@ -631,7 +631,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state // if freezer is enabled, be consistent with freezer behavior and // block stdout and stderr between calls. - isBlockIdleIO := MaxDisabledMsecs != a.cfg.FreezeIdleMsecs + isBlockIdleIO := MaxDisabledMsecs != a.cfg.FreezeIdle container, closer := NewHotContainer(call, isBlockIdleIO) defer closer() @@ -708,9 +708,9 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, var err error isFrozen := false - freezeTimer := time.NewTimer(a.cfg.FreezeIdleMsecs) + freezeTimer := time.NewTimer(a.cfg.FreezeIdle) idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second) - ejectTicker := time.NewTicker(a.cfg.EjectIdleMsecs) + ejectTicker := time.NewTicker(a.cfg.EjectIdle) defer freezeTimer.Stop() defer idleTimer.Stop() @@ -724,7 +724,7 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, }() // if an immediate freeze is requested, freeze first before enqueuing at all. - if a.cfg.FreezeIdleMsecs == time.Duration(0) && !isFrozen { + if a.cfg.FreezeIdle == time.Duration(0) && !isFrozen { err = cookie.Freeze(ctx) if err != nil { return false diff --git a/api/agent/async.go b/api/agent/async.go index 6a37fea93..5f60661a0 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -53,7 +53,7 @@ func (a *agent) asyncChew(ctx context.Context) <-chan *models.Call { ch := make(chan *models.Call, 1) go func() { - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + ctx, cancel := context.WithTimeout(ctx, a.cfg.AsyncChewPoll) defer cancel() call, err := a.da.Dequeue(ctx) diff --git a/api/agent/config.go b/api/agent/config.go index a5730fa6b..97151ce71 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -1,7 +1,6 @@ package agent import ( - "errors" "fmt" "math" "os" @@ -10,75 +9,99 @@ import ( ) type AgentConfig struct { - MinDockerVersion string `json:"min_docker_version"` - FreezeIdleMsecs time.Duration `json:"freeze_idle_msecs"` - EjectIdleMsecs time.Duration `json:"eject_idle_msecs"` - MaxResponseSize uint64 `json:"max_response_size"` - MaxLogSize uint64 `json:"max_log_size"` + MinDockerVersion string `json:"min_docker_version"` + FreezeIdle time.Duration `json:"freeze_idle_msecs"` + EjectIdle time.Duration `json:"eject_idle_msecs"` + HotPoll time.Duration `json:"hot_poll_msecs"` + HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` + AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` + MaxResponseSize uint64 `json:"max_response_size_bytes"` + MaxLogSize uint64 `json:"max_log_size_bytes"` + MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` + MaxTotalMemory uint64 `json:"max_total_memory_bytes"` } -var MaxDisabledMsecs = time.Duration(math.MaxInt64) +const ( + EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" + EnvEjectIdle = "FN_EJECT_IDLE_MSECS" + EnvHotPoll = "FN_HOT_POLL_MSECS" + EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" + EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" + EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" + EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" + EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" + EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" + + MaxDisabledMsecs = time.Duration(math.MaxInt64) +) func NewAgentConfig() (*AgentConfig, error) { - var err error - cfg := &AgentConfig{ MinDockerVersion: "17.06.0-ce", MaxLogSize: 1 * 1024 * 1024, } - cfg.FreezeIdleMsecs, err = getEnvMsecs("FN_FREEZE_IDLE_MSECS", 50*time.Millisecond) + var err error + + err = setEnvMsecs(err, EnvFreezeIdle, &cfg.FreezeIdle, 50*time.Millisecond) + err = setEnvMsecs(err, EnvEjectIdle, &cfg.EjectIdle, 1000*time.Millisecond) + err = setEnvMsecs(err, EnvHotPoll, &cfg.HotPoll, 200*time.Millisecond) + err = setEnvMsecs(err, EnvHotLauncherTimeout, &cfg.HotLauncherTimeout, time.Duration(60)*time.Minute) + err = setEnvMsecs(err, EnvAsyncChewPoll, &cfg.AsyncChewPoll, time.Duration(60)*time.Second) + err = setEnvUint(err, EnvMaxResponseSize, &cfg.MaxResponseSize) + err = setEnvUint(err, EnvMaxLogSize, &cfg.MaxLogSize) + err = setEnvUint(err, EnvMaxTotalCPU, &cfg.MaxTotalCPU) + err = setEnvUint(err, EnvMaxTotalMemory, &cfg.MaxTotalMemory) + if err != nil { - return cfg, errors.New("error initializing freeze idle delay") + return cfg, err } - if tmp := os.Getenv("FN_MAX_LOG_SIZE"); tmp != "" { - cfg.MaxLogSize, err = strconv.ParseUint(tmp, 10, 64) - if err != nil { - return cfg, errors.New("error initializing max log size") - } + if cfg.EjectIdle == time.Duration(0) { + return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle) + } + if cfg.MaxLogSize > math.MaxInt64 { // for safety during uint64 to int conversions in Write()/Read(), etc. - if cfg.MaxLogSize > math.MaxInt32 { - return cfg, fmt.Errorf("error invalid max log size %v > %v", cfg.MaxLogSize, math.MaxInt32) - } - } - - cfg.EjectIdleMsecs, err = getEnvMsecs("FN_EJECT_IDLE_MSECS", 1000*time.Millisecond) - if err != nil { - return cfg, errors.New("error initializing eject idle delay") - } - - if cfg.EjectIdleMsecs == time.Duration(0) { - return cfg, errors.New("error eject idle delay cannot be zero") - } - - if tmp := os.Getenv("FN_MAX_RESPONSE_SIZE"); tmp != "" { - cfg.MaxResponseSize, err = strconv.ParseUint(tmp, 10, 64) - if err != nil { - return cfg, errors.New("error initializing response buffer size") - } + return cfg, fmt.Errorf("error invalid %s %v > %v", EnvMaxLogSize, cfg.MaxLogSize, math.MaxInt64) } return cfg, nil } -func getEnvMsecs(name string, defaultVal time.Duration) (time.Duration, error) { +func setEnvUint(err error, name string, dst *uint64) error { + if err != nil { + return err + } + if tmp := os.Getenv(name); tmp != "" { + val, err := strconv.ParseUint(tmp, 10, 64) + if err != nil { + return fmt.Errorf("error invalid %s=%s", name, tmp) + } + *dst = val + } + return nil +} - delay := defaultVal +func setEnvMsecs(err error, name string, dst *time.Duration, defaultVal time.Duration) error { + if err != nil { + return err + } + + *dst = defaultVal if dur := os.Getenv(name); dur != "" { durInt, err := strconv.ParseInt(dur, 10, 64) if err != nil { - return defaultVal, err + return fmt.Errorf("error invalid %s=%s err=%s", name, dur, err) } // disable if negative or set to msecs specified. if durInt < 0 || time.Duration(durInt) >= MaxDisabledMsecs/time.Millisecond { - delay = MaxDisabledMsecs + *dst = MaxDisabledMsecs } else { - delay = time.Duration(durInt) * time.Millisecond + *dst = time.Duration(durInt) * time.Millisecond } } - return delay, nil + return nil } diff --git a/api/agent/pure_runner.go b/api/agent/pure_runner.go index 63bb3ae1c..3e778c966 100644 --- a/api/agent/pure_runner.go +++ b/api/agent/pure_runner.go @@ -615,6 +615,6 @@ const megabyte uint64 = 1024 * 1024 func getAvailableMemoryUnits() uint64 { // To reuse code - but it's a bit of a hack. TODO: refactor the OS-specific get memory funcs out of that. - throwawayRT := NewResourceTracker().(*resourceTracker) + throwawayRT := NewResourceTracker(nil).(*resourceTracker) return throwawayRT.ramAsyncTotal / megabyte } diff --git a/api/agent/resource.go b/api/agent/resource.go index 248da0bf8..cd22191f6 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -74,14 +74,14 @@ type resourceTracker struct { tokenWaiterCount uint64 } -func NewResourceTracker() ResourceTracker { +func NewResourceTracker(cfg *AgentConfig) ResourceTracker { obj := &resourceTracker{ cond: sync.NewCond(new(sync.Mutex)), } - obj.initializeMemory() - obj.initializeCPU() + obj.initializeMemory(cfg) + obj.initializeCPU(cfg) return obj } @@ -295,7 +295,7 @@ func clampUint64(val, min, max uint64) uint64 { return val } -func (a *resourceTracker) initializeCPU() { +func (a *resourceTracker) initializeCPU(cfg *AgentConfig) { var maxSyncCPU, maxAsyncCPU, cpuAsyncHWMark uint64 var totalCPU, availCPU uint64 @@ -320,6 +320,11 @@ func (a *resourceTracker) initializeCPU() { availCPU = minUint64(availCPU, cgroupCPU) } + // now based on cfg, further clamp on calculated values + if cfg != nil && cfg.MaxTotalCPU != 0 { + availCPU = minUint64(cfg.MaxTotalCPU, availCPU) + } + // TODO: check cgroup cpuset to clamp this further. We might be restricted into // a subset of CPUs. (eg. /sys/fs/cgroup/cpuset/cpuset.effective_cpus) @@ -360,7 +365,7 @@ func (a *resourceTracker) initializeCPU() { a.cpuAsyncTotal = maxAsyncCPU } -func (a *resourceTracker) initializeMemory() { +func (a *resourceTracker) initializeMemory(cfg *AgentConfig) { var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64 @@ -389,6 +394,11 @@ func (a *resourceTracker) initializeMemory() { } availMemory = availMemory - headRoom + // now based on cfg, further clamp on calculated values + if cfg != nil && cfg.MaxTotalMemory != 0 { + availMemory = minUint64(cfg.MaxTotalMemory, availMemory) + } + logrus.WithFields(logrus.Fields{ "totalMemory": totalMemory, "availMemory": availMemory, @@ -420,9 +430,9 @@ func (a *resourceTracker) initializeMemory() { } if maxSyncMemory+maxAsyncMemory < 256*Mem1MB { - logrus.Warn("Severaly Limited memory: ramSync + ramAsync < 256MB") + logrus.Warn("Severely Limited memory: ramSync + ramAsync < 256MB") } else if maxAsyncMemory < 256*Mem1MB { - logrus.Warn("Severaly Limited memory: ramAsync < 256MB") + logrus.Warn("Severely Limited memory: ramAsync < 256MB") } a.ramAsyncHWMark = ramAsyncHWMark diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go index 17a4cf44c..cb4527e4c 100644 --- a/api/agent/resource_test.go +++ b/api/agent/resource_test.go @@ -99,7 +99,7 @@ func TestResourceAsyncWait(t *testing.T) { var vals trackerVals - trI := NewResourceTracker() + trI := NewResourceTracker(nil) tr := trI.(*resourceTracker) @@ -166,7 +166,7 @@ func TestResourceAsyncWait(t *testing.T) { func TestResourceGetSimple(t *testing.T) { var vals trackerVals - trI := NewResourceTracker() + trI := NewResourceTracker(nil) tr := trI.(*resourceTracker) vals.setDefaults() @@ -229,7 +229,7 @@ func TestResourceGetSimple(t *testing.T) { func TestResourceGetCombo(t *testing.T) { var vals trackerVals - trI := NewResourceTracker() + trI := NewResourceTracker(nil) tr := trI.(*resourceTracker) vals.setDefaults() diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 04c29b4cf..f35f8b9cc 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -154,7 +154,7 @@ func TestRouteRunnerIOPipes(t *testing.T) { // more timing related issues below. Slightly gains us a bit more // determinism. tweaker1 := envTweaker("FN_FREEZE_IDLE_MSECS", "0") - tweaker2 := envTweaker("FN_MAX_LOG_SIZE", "5") + tweaker2 := envTweaker("FN_MAX_LOG_SIZE_BYTES", "5") defer tweaker1() defer tweaker2() @@ -187,7 +187,7 @@ func TestRouteRunnerIOPipes(t *testing.T) { // sleep between logs and with debug enabled, fn-test-utils will log header/footer below: immediateGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 0}` immediateJsonValidGarbage := `{"isDebug": true, "postOutGarbage": "\r", "postSleepTime": 0}` - delayedGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 1000}` + delayedGarbage := `{"isDebug": true, "postOutGarbage": "YOGURT_YOGURT_YOGURT", "postSleepTime": 1500}` ok := `{"isDebug": true}` containerIds := make([]string, 0) @@ -211,7 +211,7 @@ func TestRouteRunnerIOPipes(t *testing.T) { {"/r/zoo/json/", immediateGarbage, "GET", http.StatusOK, "", nil, 0}, // CASE II: delayed garbage: make sure delayed output lands in between request processing, should be blocked until next req - {"/r/zoo/json/", delayedGarbage, "GET", http.StatusOK, "", nil, time.Second * 2}, + {"/r/zoo/json/", delayedGarbage, "GET", http.StatusOK, "", nil, time.Millisecond * 2500}, // CASE III: normal, but should get faulty I/O from previous {"/r/zoo/json/", ok, "GET", http.StatusBadGateway, "invalid json", nil, 0}, @@ -355,7 +355,11 @@ func TestRouteRunnerExecution(t *testing.T) { expHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"application/json; charset=utf-8"}} expCTHeaders := map[string][]string{"X-Function": {"Test"}, "Content-Type": {"foo/bar"}} - multiLogExpect := []string{"BeginOfLogs", "EndOfLogs"} + + // Checking for EndOfLogs currently depends on scheduling of go-routines (in docker/containerd) that process stderr & stdout. + // Therefore, not testing for EndOfLogs for hot containers (which has complex I/O processing) anymore. + multiLogExpectCold := []string{"BeginOfLogs", "EndOfLogs"} + multiLogExpectHot := []string{"BeginOfLogs" /*, "EndOfLogs" */} crasher := `{"echoContent": "_TRX_ID_", "isDebug": true, "isCrash": true}` // crash container oomer := `{"echoContent": "_TRX_ID_", "isDebug": true, "allocateMemory": 12000000}` // ask for 12MB @@ -400,8 +404,8 @@ func TestRouteRunnerExecution(t *testing.T) { {"/r/myapp/mydneregistry", ``, "GET", http.StatusInternalServerError, nil, "connection refused", nil}, {"/r/myapp/myoom", oomer, "GET", http.StatusBadGateway, nil, "container out of memory", nil}, - {"/r/myapp/myhot", multiLog, "GET", http.StatusOK, nil, "", multiLogExpect}, - {"/r/myapp/", multiLog, "GET", http.StatusOK, nil, "", multiLogExpect}, + {"/r/myapp/myhot", multiLog, "GET", http.StatusOK, nil, "", multiLogExpectHot}, + {"/r/myapp/", multiLog, "GET", http.StatusOK, nil, "", multiLogExpectCold}, {"/r/myapp/mybigoutputjson", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil}, {"/r/myapp/mybigoutputjson", smalloutput, "GET", http.StatusOK, nil, "", nil}, {"/r/myapp/mybigoutputhttp", bigoutput, "GET", http.StatusBadGateway, nil, "function response too large", nil},