From 3ff28163dbede8fa1ed6f4ab9d6ddbe0fcd4826a Mon Sep 17 00:00:00 2001 From: Reed Allman Date: Wed, 2 Aug 2017 17:11:47 -0700 Subject: [PATCH] fix task memory prior to this patch we were allowing 256MB for every function run, just because that was the default for the docker driver and we were not using the memory field on any given route configuration. this fixes that, now docker containers will get the correct memory limit passed into the container from the route. the default is still 128. there is also an env var now, `MEMORY_MB` that is set on each function call, see the linked issue below for rationale. closes #186 ran the given function code from #186, and now i only see allocations up to 32MB before the function is killed. yay. notes: there is no max for memory. for open source fn i'm not sure we want to cap it, really. in the services repo we probably should add a cap before prod. since we don't know any given fn server's ram, we can't try to make sure the setting on any given route is something that can even be run. remove envconfig & bytefmt this updates the glide.yaml file to remove the unused deps, but trying to install fresh is broken atm so i couldn't remove from vendor/, going to fix separately (next update we just won't get these). also changed the skip dir to be the cli dir now that its name has changed (related to brokenness). fix how ram slots were being allocated. integer division is significantly slower than subtraction. --- api/runner/async_runner.go | 1 + api/runner/common/stats/stats.go | 6 +-- api/runner/common/stats/statsd.go | 6 +-- api/runner/drivers/docker/docker.go | 3 +- api/runner/drivers/docker/docker_test.go | 18 +-------- api/runner/drivers/driver.go | 47 +++++++++--------------- api/runner/runner.go | 8 +--- api/runner/runner_test.go | 18 +++++++++ api/runner/task.go | 18 ++++----- api/server/runner.go | 1 + glide.yaml | 4 +- 11 files changed, 57 insertions(+), 73 deletions(-) diff --git a/api/runner/async_runner.go b/api/runner/async_runner.go index 9760164b8..73a9d06a4 100644 --- a/api/runner/async_runner.go +++ b/api/runner/async_runner.go @@ -72,6 +72,7 @@ func getCfg(t *models.Task) *task.Config { Image: *t.Image, ID: t.ID, AppName: t.AppName, + Memory: 128, Env: t.EnvVars, Ready: make(chan struct{}), Stdin: strings.NewReader(t.Payload), diff --git a/api/runner/common/stats/stats.go b/api/runner/common/stats/stats.go index 4f4ea8975..84eee3b21 100644 --- a/api/runner/common/stats/stats.go +++ b/api/runner/common/stats/stats.go @@ -15,14 +15,14 @@ type HTTPSubHandler interface { } type Config struct { - Interval float64 `json:"interval" envconfig:"STATS_INTERVAL"` // seconds + Interval float64 `json:"interval"` // seconds History int // minutes - Log string `json:"log" envconfig:"STATS_LOG"` + Log string `json:"log"` StatHat *StatHatReporterConfig NewRelic *NewRelicReporterConfig Statsd *StatsdConfig - GCStats int `json:"gc_stats" envconfig:"GC_STATS"` // seconds + GCStats int `json:"gc_stats"` } type Statter interface { diff --git a/api/runner/common/stats/statsd.go b/api/runner/common/stats/statsd.go index e1bfc0947..29ce166f9 100644 --- a/api/runner/common/stats/statsd.go +++ b/api/runner/common/stats/statsd.go @@ -11,9 +11,9 @@ import ( ) type StatsdConfig struct { - StatsdUdpTarget string `json:"target" mapstructure:"target" envconfig:"STATSD_TARGET"` - Interval int64 `json:"interval" envconfig:"STATSD_INTERVAL"` - Prefix string `json:"prefix" envconfig:"STATSD_PREFIX"` + StatsdUdpTarget string `json:"target" mapstructure:"target"` + Interval int64 `json:"interval"` + Prefix string `json:"prefix"` } type keyCreator interface { diff --git a/api/runner/drivers/docker/docker.go b/api/runner/drivers/docker/docker.go index 2b6b02cb4..8de7945a5 100644 --- a/api/runner/drivers/docker/docker.go +++ b/api/runner/drivers/docker/docker.go @@ -218,7 +218,6 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask var cmd []string if task.Command() != "" { // NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests - // task.Command() in swapi is always "sh /mnt/task/.runtask" so fields is safe cmd = strings.Fields(task.Command()) log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command") } @@ -234,7 +233,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask Config: &docker.Config{ Env: envvars, Cmd: cmd, - Memory: int64(drv.conf.Memory), + Memory: int64(task.Memory()), CPUShares: drv.conf.CPUShares, Hostname: drv.hostname, Image: task.Image(), diff --git a/api/runner/drivers/docker/docker_test.go b/api/runner/drivers/docker/docker_test.go index f764bb560..4fbcdefc0 100644 --- a/api/runner/drivers/docker/docker_test.go +++ b/api/runner/drivers/docker/docker_test.go @@ -4,14 +4,12 @@ import ( "bytes" "context" "io" - "os" "strings" "testing" "time" "github.com/fnproject/fn/api/runner/common" "github.com/fnproject/fn/api/runner/drivers" - "github.com/vrischmann/envconfig" ) type taskDockerTest struct { @@ -32,6 +30,7 @@ func (f *taskDockerTest) Timeout() time.Duration { return 30 * time. func (f *taskDockerTest) Logger() (stdout, stderr io.Writer) { return f.output, nil } func (f *taskDockerTest) WriteStat(drivers.Stat) { /* TODO */ } func (f *taskDockerTest) Volumes() [][2]string { return [][2]string{} } +func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 } func (f *taskDockerTest) WorkDir() string { return "" } func (f *taskDockerTest) Close() {} func (f *taskDockerTest) Input() io.Reader { return f.input } @@ -90,18 +89,3 @@ func TestRunnerDockerStdin(t *testing.T) { t.Errorf("Test expected output to contain '%s', got '%s'", expect, got) } } - -func TestConfigLoadMemory(t *testing.T) { - if err := os.Setenv("MEMORY_PER_JOB", "128M"); err != nil { - t.Fatalf("Could not set MEMORY_PER_JOB: %v", err) - } - - var conf drivers.Config - if err := envconfig.Init(&conf); err != nil { - t.Fatalf("Could not read config: %v", err) - } - - if conf.Memory != 128*1024*1024 { - t.Fatalf("Memory read from config should match 128M, got %d", conf.Memory) - } -} diff --git a/api/runner/drivers/driver.go b/api/runner/drivers/driver.go index 2ae63a11d..922a3623d 100644 --- a/api/runner/drivers/driver.go +++ b/api/runner/drivers/driver.go @@ -8,8 +8,6 @@ import ( "io" "strings" "time" - - "code.cloudfoundry.org/bytefmt" ) // A DriverCookie identifies a unique request to run a task. @@ -68,26 +66,41 @@ type RunResult interface { type ContainerTask interface { // Command returns the command to run within the container. Command() string + // EnvVars returns environment variable key-value pairs. EnvVars() map[string]string + // Input feeds the container with data Input() io.Reader + // Labels returns container label key-value pairs. Labels() map[string]string + + // The id to assign the container Id() string + // Image returns the runtime specific image to run. Image() string + // Timeout specifies the maximum time a task is allowed to run. Return 0 to let it run forever. Timeout() time.Duration + // Driver will write output log from task execution to these writers. Must be // non-nil. Use io.Discard if log is irrelevant. Logger() (stdout, stderr io.Writer) + // WriteStat writes a single Stat, implementation need not be thread safe. WriteStat(Stat) + // Volumes returns an array of 2-element tuples indicating storage volume mounts. // The first element is the path on the host, and the second element is the // path in the container. Volumes() [][2]string + + // Memory determines the max amount of RAM given to the container to use. + // 0 is unlimited. + Memory() uint64 + // WorkDir returns the working directory to use for the task. Empty string // leaves it unset. WorkDir() string @@ -129,40 +142,16 @@ const ( StatusCancelled = "cancelled" ) -// Allows us to implement custom unmarshaling of JSON and envconfig. -type Memory uint64 - -func (m *Memory) Unmarshal(s string) error { - temp, err := bytefmt.ToBytes(s) - if err != nil { - return err - } - - *m = Memory(temp) - return nil -} - -func (m *Memory) UnmarshalJSON(p []byte) error { - temp, err := bytefmt.ToBytes(string(p)) - if err != nil { - return err - } - - *m = Memory(temp) - return nil -} - type Config struct { - Docker string `json:"docker" envconfig:"default=unix:///var/run/docker.sock,DOCKER"` - Memory Memory `json:"memory" envconfig:"default=256M,MEMORY_PER_JOB"` - CPUShares int64 `json:"cpu_shares" envconfig:"default=2,CPU_SHARES"` + Docker string `json:"docker"` + // TODO CPUShares should likely be on a per container basis + CPUShares int64 `json:"cpu_shares"` } // for tests func DefaultConfig() Config { return Config{ Docker: "unix:///var/run/docker.sock", - Memory: 256 * 1024 * 1024, CPUShares: 0, } } diff --git a/api/runner/runner.go b/api/runner/runner.go index 8da4ca0dd..57fd4180e 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -132,7 +132,7 @@ func (r *Runner) hasAsyncAvailableMemory() bool { func (r *Runner) checkRequiredMem(req uint64) bool { r.usedMemMutex.RLock() defer r.usedMemMutex.RUnlock() - return (r.availableMem-r.usedMem)/int64(req)*1024*1024 > 0 + return r.availableMem-r.usedMem-(int64(req)*1024*1024) > 0 } func (r *Runner) addUsedMem(used int64) { @@ -150,7 +150,7 @@ func (r *Runner) checkMemAndUse(req uint64) bool { used := int64(req) * 1024 * 1024 - if (r.availableMem-r.usedMem)/used < 0 { + if r.availableMem-r.usedMem-used < 0 { return false } @@ -189,10 +189,6 @@ func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult, span, ctx := opentracing.StartSpanFromContext(ctx, "run_container") defer span.Finish() - if cfg.Memory == 0 { - cfg.Memory = 128 - } - if cfg.Stdout == nil { // TODO why? async? cfg.Stdout = cfg.Stderr diff --git a/api/runner/runner_test.go b/api/runner/runner_test.go index 5eae9f05c..c6febf37f 100644 --- a/api/runner/runner_test.go +++ b/api/runner/runner_test.go @@ -44,6 +44,7 @@ func TestRunnerHello(t *testing.T) { ID: test.taskID, Image: test.route.Image, Timeout: 10 * time.Second, + Memory: 128, Ready: make(chan struct{}), Stdin: strings.NewReader(test.payload), AppName: test.route.AppName, @@ -103,6 +104,7 @@ func TestRunnerError(t *testing.T) { ID: fmt.Sprintf("err-%d-%d", i, time.Now().Unix()), Image: test.route.Image, Timeout: 10 * time.Second, + Memory: 128, Ready: make(chan struct{}), Stdin: strings.NewReader(test.payload), Stdout: &stdout, @@ -131,3 +133,19 @@ func TestRunnerError(t *testing.T) { } } } + +func TestRunnerMemory(t *testing.T) { + // make sure we get MB out of a task.Config when turned into a containerTask + // (so if Config.Memory changes to not be MB we hear about it) + + cfg := &task.Config{ + Memory: 128, + } + + task := &containerTask{cfg: cfg} + + const exp = 128 * 1024 * 1024 + if task.Memory() != exp { + t.Fatalf("Expected Memory to return %v but got %v", exp, task.Memory()) + } +} diff --git a/api/runner/task.go b/api/runner/task.go index 03c0ee098..bb729f58e 100644 --- a/api/runner/task.go +++ b/api/runner/task.go @@ -19,6 +19,7 @@ import ( var registries dockerRegistries func init() { + // TODO this is docker specific. and the docker client is capable of doing this, remove & test // Attempt to fetch it from an environment variable regsettings := os.Getenv("DOCKER_AUTH") @@ -54,37 +55,34 @@ func init() { } +// TODO task.Config should implement the interface. this is sad :( +// implements drivers.ContainerTask type containerTask struct { ctx context.Context cfg *task.Config canRun chan bool } -func (t *containerTask) Command() string { return "" } - func (t *containerTask) EnvVars() map[string]string { if protocol.IsStreamable(protocol.Protocol(t.cfg.Format)) { return t.cfg.BaseEnv } return t.cfg.Env } -func (t *containerTask) Input() io.Reader { - return t.cfg.Stdin -} func (t *containerTask) Labels() map[string]string { - return map[string]string{ - "LogName": t.cfg.AppName, - } + // TODO this seems inaccurate? is this used by anyone (dev or not)? + return map[string]string{"LogName": t.cfg.AppName} } +func (t *containerTask) Command() string { return "" } +func (t *containerTask) Input() io.Reader { return t.cfg.Stdin } func (t *containerTask) Id() string { return t.cfg.ID } -func (t *containerTask) Route() string { return "" } func (t *containerTask) Image() string { return t.cfg.Image } func (t *containerTask) Timeout() time.Duration { return t.cfg.Timeout } -func (t *containerTask) IdleTimeout() time.Duration { return t.cfg.IdleTimeout } func (t *containerTask) Logger() (io.Writer, io.Writer) { return t.cfg.Stdout, t.cfg.Stderr } func (t *containerTask) Volumes() [][2]string { return [][2]string{} } +func (t *containerTask) Memory() uint64 { return t.cfg.Memory * 1024 * 1024 } // convert MB func (t *containerTask) WorkDir() string { return "" } func (t *containerTask) Close() {} func (t *containerTask) WriteStat(drivers.Stat) {} diff --git a/api/server/runner.go b/api/server/runner.go index dae350af0..439abdf30 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -148,6 +148,7 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, rout baseVars["FN_FORMAT"] = route.Format baseVars["APP_NAME"] = appName baseVars["ROUTE"] = route.Path + baseVars["MEMORY_MB"] = fmt.Sprintf("%d", route.Memory) // app config for k, v := range app.Config { diff --git a/glide.yaml b/glide.yaml index 438f13464..238c0c10a 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,8 +1,7 @@ package: github.com/fnproject/fn excludeDirs: -- fn +- cli import: -- package: code.cloudfoundry.org/bytefmt - package: github.com/funcy/functions_go version: ^0.1.35 subpackages: @@ -72,7 +71,6 @@ import: - package: github.com/opentracing/opentracing-go - package: github.com/openzipkin/zipkin-go-opentracing testImport: -- package: github.com/vrischmann/envconfig - package: github.com/opencontainers/go-digest branch: master - package: github.com/patrickmn/go-cache