mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge pull request #191 from fnproject/fix-memory-limits
fix task memory
This commit is contained in:
@@ -72,6 +72,7 @@ func getCfg(t *models.Task) *task.Config {
|
||||
Image: *t.Image,
|
||||
ID: t.ID,
|
||||
AppName: t.AppName,
|
||||
Memory: 128,
|
||||
Env: t.EnvVars,
|
||||
Ready: make(chan struct{}),
|
||||
Stdin: strings.NewReader(t.Payload),
|
||||
|
||||
@@ -15,14 +15,14 @@ type HTTPSubHandler interface {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Interval float64 `json:"interval" envconfig:"STATS_INTERVAL"` // seconds
|
||||
Interval float64 `json:"interval"` // seconds
|
||||
History int // minutes
|
||||
|
||||
Log string `json:"log" envconfig:"STATS_LOG"`
|
||||
Log string `json:"log"`
|
||||
StatHat *StatHatReporterConfig
|
||||
NewRelic *NewRelicReporterConfig
|
||||
Statsd *StatsdConfig
|
||||
GCStats int `json:"gc_stats" envconfig:"GC_STATS"` // seconds
|
||||
GCStats int `json:"gc_stats"`
|
||||
}
|
||||
|
||||
type Statter interface {
|
||||
|
||||
@@ -11,9 +11,9 @@ import (
|
||||
)
|
||||
|
||||
type StatsdConfig struct {
|
||||
StatsdUdpTarget string `json:"target" mapstructure:"target" envconfig:"STATSD_TARGET"`
|
||||
Interval int64 `json:"interval" envconfig:"STATSD_INTERVAL"`
|
||||
Prefix string `json:"prefix" envconfig:"STATSD_PREFIX"`
|
||||
StatsdUdpTarget string `json:"target" mapstructure:"target"`
|
||||
Interval int64 `json:"interval"`
|
||||
Prefix string `json:"prefix"`
|
||||
}
|
||||
|
||||
type keyCreator interface {
|
||||
|
||||
@@ -218,7 +218,6 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
|
||||
var cmd []string
|
||||
if task.Command() != "" {
|
||||
// NOTE: this is hyper-sensitive and may not be correct like this even, but it passes old tests
|
||||
// task.Command() in swapi is always "sh /mnt/task/.runtask" so fields is safe
|
||||
cmd = strings.Fields(task.Command())
|
||||
log.WithFields(logrus.Fields{"call_id": task.Id(), "cmd": cmd, "len": len(cmd)}).Debug("docker command")
|
||||
}
|
||||
@@ -234,7 +233,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask
|
||||
Config: &docker.Config{
|
||||
Env: envvars,
|
||||
Cmd: cmd,
|
||||
Memory: int64(drv.conf.Memory),
|
||||
Memory: int64(task.Memory()),
|
||||
CPUShares: drv.conf.CPUShares,
|
||||
Hostname: drv.hostname,
|
||||
Image: task.Image(),
|
||||
|
||||
@@ -4,14 +4,12 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fnproject/fn/api/runner/drivers"
|
||||
"github.com/vrischmann/envconfig"
|
||||
)
|
||||
|
||||
type taskDockerTest struct {
|
||||
@@ -32,6 +30,7 @@ func (f *taskDockerTest) Timeout() time.Duration { return 30 * time.
|
||||
func (f *taskDockerTest) Logger() (stdout, stderr io.Writer) { return f.output, nil }
|
||||
func (f *taskDockerTest) WriteStat(drivers.Stat) { /* TODO */ }
|
||||
func (f *taskDockerTest) Volumes() [][2]string { return [][2]string{} }
|
||||
func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 }
|
||||
func (f *taskDockerTest) WorkDir() string { return "" }
|
||||
func (f *taskDockerTest) Close() {}
|
||||
func (f *taskDockerTest) Input() io.Reader { return f.input }
|
||||
@@ -90,18 +89,3 @@ func TestRunnerDockerStdin(t *testing.T) {
|
||||
t.Errorf("Test expected output to contain '%s', got '%s'", expect, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigLoadMemory(t *testing.T) {
|
||||
if err := os.Setenv("MEMORY_PER_JOB", "128M"); err != nil {
|
||||
t.Fatalf("Could not set MEMORY_PER_JOB: %v", err)
|
||||
}
|
||||
|
||||
var conf drivers.Config
|
||||
if err := envconfig.Init(&conf); err != nil {
|
||||
t.Fatalf("Could not read config: %v", err)
|
||||
}
|
||||
|
||||
if conf.Memory != 128*1024*1024 {
|
||||
t.Fatalf("Memory read from config should match 128M, got %d", conf.Memory)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,8 +8,6 @@ import (
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"code.cloudfoundry.org/bytefmt"
|
||||
)
|
||||
|
||||
// A DriverCookie identifies a unique request to run a task.
|
||||
@@ -68,26 +66,41 @@ type RunResult interface {
|
||||
type ContainerTask interface {
|
||||
// Command returns the command to run within the container.
|
||||
Command() string
|
||||
|
||||
// EnvVars returns environment variable key-value pairs.
|
||||
EnvVars() map[string]string
|
||||
|
||||
// Input feeds the container with data
|
||||
Input() io.Reader
|
||||
|
||||
// Labels returns container label key-value pairs.
|
||||
Labels() map[string]string
|
||||
|
||||
// The id to assign the container
|
||||
Id() string
|
||||
|
||||
// Image returns the runtime specific image to run.
|
||||
Image() string
|
||||
|
||||
// Timeout specifies the maximum time a task is allowed to run. Return 0 to let it run forever.
|
||||
Timeout() time.Duration
|
||||
|
||||
// Driver will write output log from task execution to these writers. Must be
|
||||
// non-nil. Use io.Discard if log is irrelevant.
|
||||
Logger() (stdout, stderr io.Writer)
|
||||
|
||||
// WriteStat writes a single Stat, implementation need not be thread safe.
|
||||
WriteStat(Stat)
|
||||
|
||||
// Volumes returns an array of 2-element tuples indicating storage volume mounts.
|
||||
// The first element is the path on the host, and the second element is the
|
||||
// path in the container.
|
||||
Volumes() [][2]string
|
||||
|
||||
// Memory determines the max amount of RAM given to the container to use.
|
||||
// 0 is unlimited.
|
||||
Memory() uint64
|
||||
|
||||
// WorkDir returns the working directory to use for the task. Empty string
|
||||
// leaves it unset.
|
||||
WorkDir() string
|
||||
@@ -129,40 +142,16 @@ const (
|
||||
StatusCancelled = "cancelled"
|
||||
)
|
||||
|
||||
// Allows us to implement custom unmarshaling of JSON and envconfig.
|
||||
type Memory uint64
|
||||
|
||||
func (m *Memory) Unmarshal(s string) error {
|
||||
temp, err := bytefmt.ToBytes(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*m = Memory(temp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Memory) UnmarshalJSON(p []byte) error {
|
||||
temp, err := bytefmt.ToBytes(string(p))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*m = Memory(temp)
|
||||
return nil
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Docker string `json:"docker" envconfig:"default=unix:///var/run/docker.sock,DOCKER"`
|
||||
Memory Memory `json:"memory" envconfig:"default=256M,MEMORY_PER_JOB"`
|
||||
CPUShares int64 `json:"cpu_shares" envconfig:"default=2,CPU_SHARES"`
|
||||
Docker string `json:"docker"`
|
||||
// TODO CPUShares should likely be on a per container basis
|
||||
CPUShares int64 `json:"cpu_shares"`
|
||||
}
|
||||
|
||||
// for tests
|
||||
func DefaultConfig() Config {
|
||||
return Config{
|
||||
Docker: "unix:///var/run/docker.sock",
|
||||
Memory: 256 * 1024 * 1024,
|
||||
CPUShares: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ func (r *Runner) hasAsyncAvailableMemory() bool {
|
||||
func (r *Runner) checkRequiredMem(req uint64) bool {
|
||||
r.usedMemMutex.RLock()
|
||||
defer r.usedMemMutex.RUnlock()
|
||||
return (r.availableMem-r.usedMem)/int64(req)*1024*1024 > 0
|
||||
return r.availableMem-r.usedMem-(int64(req)*1024*1024) > 0
|
||||
}
|
||||
|
||||
func (r *Runner) addUsedMem(used int64) {
|
||||
@@ -150,7 +150,7 @@ func (r *Runner) checkMemAndUse(req uint64) bool {
|
||||
|
||||
used := int64(req) * 1024 * 1024
|
||||
|
||||
if (r.availableMem-r.usedMem)/used < 0 {
|
||||
if r.availableMem-r.usedMem-used < 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -189,10 +189,6 @@ func (r *Runner) run(ctx context.Context, cfg *task.Config) (drivers.RunResult,
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "run_container")
|
||||
defer span.Finish()
|
||||
|
||||
if cfg.Memory == 0 {
|
||||
cfg.Memory = 128
|
||||
}
|
||||
|
||||
if cfg.Stdout == nil {
|
||||
// TODO why? async?
|
||||
cfg.Stdout = cfg.Stderr
|
||||
|
||||
@@ -44,6 +44,7 @@ func TestRunnerHello(t *testing.T) {
|
||||
ID: test.taskID,
|
||||
Image: test.route.Image,
|
||||
Timeout: 10 * time.Second,
|
||||
Memory: 128,
|
||||
Ready: make(chan struct{}),
|
||||
Stdin: strings.NewReader(test.payload),
|
||||
AppName: test.route.AppName,
|
||||
@@ -103,6 +104,7 @@ func TestRunnerError(t *testing.T) {
|
||||
ID: fmt.Sprintf("err-%d-%d", i, time.Now().Unix()),
|
||||
Image: test.route.Image,
|
||||
Timeout: 10 * time.Second,
|
||||
Memory: 128,
|
||||
Ready: make(chan struct{}),
|
||||
Stdin: strings.NewReader(test.payload),
|
||||
Stdout: &stdout,
|
||||
@@ -131,3 +133,19 @@ func TestRunnerError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunnerMemory(t *testing.T) {
|
||||
// make sure we get MB out of a task.Config when turned into a containerTask
|
||||
// (so if Config.Memory changes to not be MB we hear about it)
|
||||
|
||||
cfg := &task.Config{
|
||||
Memory: 128,
|
||||
}
|
||||
|
||||
task := &containerTask{cfg: cfg}
|
||||
|
||||
const exp = 128 * 1024 * 1024
|
||||
if task.Memory() != exp {
|
||||
t.Fatalf("Expected Memory to return %v but got %v", exp, task.Memory())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
var registries dockerRegistries
|
||||
|
||||
func init() {
|
||||
// TODO this is docker specific. and the docker client is capable of doing this, remove & test
|
||||
|
||||
// Attempt to fetch it from an environment variable
|
||||
regsettings := os.Getenv("DOCKER_AUTH")
|
||||
@@ -54,37 +55,34 @@ func init() {
|
||||
|
||||
}
|
||||
|
||||
// TODO task.Config should implement the interface. this is sad :(
|
||||
// implements drivers.ContainerTask
|
||||
type containerTask struct {
|
||||
ctx context.Context
|
||||
cfg *task.Config
|
||||
canRun chan bool
|
||||
}
|
||||
|
||||
func (t *containerTask) Command() string { return "" }
|
||||
|
||||
func (t *containerTask) EnvVars() map[string]string {
|
||||
if protocol.IsStreamable(protocol.Protocol(t.cfg.Format)) {
|
||||
return t.cfg.BaseEnv
|
||||
}
|
||||
return t.cfg.Env
|
||||
}
|
||||
func (t *containerTask) Input() io.Reader {
|
||||
return t.cfg.Stdin
|
||||
}
|
||||
|
||||
func (t *containerTask) Labels() map[string]string {
|
||||
return map[string]string{
|
||||
"LogName": t.cfg.AppName,
|
||||
}
|
||||
// TODO this seems inaccurate? is this used by anyone (dev or not)?
|
||||
return map[string]string{"LogName": t.cfg.AppName}
|
||||
}
|
||||
|
||||
func (t *containerTask) Command() string { return "" }
|
||||
func (t *containerTask) Input() io.Reader { return t.cfg.Stdin }
|
||||
func (t *containerTask) Id() string { return t.cfg.ID }
|
||||
func (t *containerTask) Route() string { return "" }
|
||||
func (t *containerTask) Image() string { return t.cfg.Image }
|
||||
func (t *containerTask) Timeout() time.Duration { return t.cfg.Timeout }
|
||||
func (t *containerTask) IdleTimeout() time.Duration { return t.cfg.IdleTimeout }
|
||||
func (t *containerTask) Logger() (io.Writer, io.Writer) { return t.cfg.Stdout, t.cfg.Stderr }
|
||||
func (t *containerTask) Volumes() [][2]string { return [][2]string{} }
|
||||
func (t *containerTask) Memory() uint64 { return t.cfg.Memory * 1024 * 1024 } // convert MB
|
||||
func (t *containerTask) WorkDir() string { return "" }
|
||||
func (t *containerTask) Close() {}
|
||||
func (t *containerTask) WriteStat(drivers.Stat) {}
|
||||
|
||||
@@ -148,6 +148,8 @@ func (s *Server) serve(ctx context.Context, c *gin.Context, appName string, rout
|
||||
baseVars["FN_FORMAT"] = route.Format
|
||||
baseVars["APP_NAME"] = appName
|
||||
baseVars["ROUTE"] = route.Path
|
||||
// TODO add this back after #193 #195 (fix async RAM)
|
||||
// baseVars["MEMORY_MB"] = fmt.Sprintf("%d", route.Memory)
|
||||
|
||||
// app config
|
||||
for k, v := range app.Config {
|
||||
|
||||
Reference in New Issue
Block a user