diff --git a/api/runner/runner.go b/api/runner/runner.go index d640b2c4d..e88d352ad 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -1,12 +1,19 @@ package runner import ( + "bufio" + "errors" "fmt" "io" + "io/ioutil" + "os" + "strconv" + "strings" "time" "golang.org/x/net/context" + "github.com/Sirupsen/logrus" "github.com/iron-io/titan/common" "github.com/iron-io/titan/runner/agent" "github.com/iron-io/titan/runner/drivers" @@ -20,15 +27,24 @@ type Config struct { Image string Timeout time.Duration AppName string + Memory uint64 Env map[string]string Stdout io.Writer Stderr io.Writer } type Runner struct { - driver drivers.Driver + driver drivers.Driver + taskQueue chan *containerTask } +var ( + ErrTimeOutNoMemory = errors.New("Task timed out. No available memory.") + ErrFullQueue = errors.New("The runner queue is full") + + WaitMemoryTimeout = 3 * time.Second +) + func New() (*Runner, error) { // TODO: Is this really required for Titan's driver? // Can we remove it? @@ -40,30 +56,107 @@ func New() (*Runner, error) { return nil, err } - return &Runner{ - driver: driver, - }, nil + r := &Runner{ + driver: driver, + taskQueue: make(chan *containerTask, 0), + } + + go r.queueHandler() + + return r, nil +} + +// This routine checks for available memory; +// If there's memory then send signal to the task to proceed. +// If there's not available memory to run the task it waits +// If the task waits for more than X seconds it timeouts +func (r *Runner) queueHandler() { + var task *containerTask + var waitStart time.Time + var waitTime time.Duration + var timedOut bool + for { + select { + case task = <-r.taskQueue: + waitStart = time.Now() + timedOut = false + } + + // Loop waiting for available memory + avail := dynamicSizing(task.cfg.Memory) + for ; avail == 0; avail = dynamicSizing(task.cfg.Memory) { + waitTime = time.Since(waitStart) + if waitTime > WaitMemoryTimeout { + timedOut = true + break + } + } + + metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName) + LogMetricTime(task.ctx, (metricBaseName + "waittime"), waitTime) + + if timedOut { + // Send to a signal to this task saying it cannot run + task.canRun <- false + continue + } + + // Send a signal to this task saying it can run + task.canRun <- true + } } func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error) { var err error ctask := &containerTask{ - cfg: cfg, - auth: &agent.ConfigAuth{}, + ctx: ctx, + cfg: cfg, + auth: &agent.ConfigAuth{}, + canRun: make(chan bool), } + metricBaseName := fmt.Sprintf("run.%s.", cfg.AppName) + LogMetricCount(ctx, (metricBaseName + "requests"), 1) + closer, err := r.driver.Prepare(ctx, ctask) if err != nil { return nil, err } defer closer.Close() + // Check if has enough available memory + if dynamicSizing(cfg.Memory) == 0 { + // If not, try add task to the queue + select { + case r.taskQueue <- ctask: + default: + // If queue is full, return error + return nil, ErrFullQueue + } + + // If task was added to the queue, wait for permission + if ok := <-ctask.canRun; !ok { + // This task timed out, not available memory + return nil, ErrTimeOutNoMemory + } + } + + metricStart := time.Now() result, err := r.driver.Run(ctx, ctask) if err != nil { return nil, err } + if result.Status() == "success" { + LogMetricCount(ctx, (metricBaseName + "succeeded"), 1) + } else { + LogMetricCount(ctx, (metricBaseName + "error"), 1) + } + + metricElapsed := time.Since(metricStart) + LogMetricTime(ctx, (metricBaseName + "time"), metricElapsed) + return result, nil } @@ -90,3 +183,77 @@ func selectDriver(driver string, env *common.Environment, conf *driverscommon.Co } return nil, fmt.Errorf("driver %v not found", driver) } + +func dynamicSizing(reqMem uint64) int { + const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB + + availableMemory, err := checkCgroup() + if err != nil { + logrus.WithError(err).Error("Error checking for cgroup memory limits, falling back to host memory available..") + } + if availableMemory > tooBig || availableMemory == 0 { + // Then -m flag probably wasn't set, so use max available on system + availableMemory, err = checkProc() + if availableMemory > tooBig || availableMemory == 0 { + logrus.WithError(err).Fatal("Your Linux version is too old (<3.14) then we can't get the proper information to . You must specify the maximum available memory by passing the -m command with docker run when starting the runner via docker, eg: `docker run -m 2G ...`") + } + } + + c := availableMemory / (reqMem * 1024 * 1024) + + return int(c) +} + +func checkCgroup() (uint64, error) { + f, err := os.Open("/sys/fs/cgroup/memory/memory.limit_in_bytes") + if err != nil { + return 0, err + } + defer f.Close() + b, err := ioutil.ReadAll(f) + limBytes := string(b) + limBytes = strings.TrimSpace(limBytes) + if err != nil { + return 0, err + } + return strconv.ParseUint(limBytes, 10, 64) +} + +func checkProc() (uint64, error) { + f, err := os.Open("/proc/meminfo") + if err != nil { + return 0, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + b := scanner.Text() + if !strings.HasPrefix(b, "MemFree") { + continue + } + + // expect form: + // MemAvailable: 1234567890 kB + tri := strings.Fields(b) + if len(tri) != 3 { + return 0, fmt.Errorf("MemAvailable line has unexpected format: %v", b) + } + + c, err := strconv.ParseUint(tri[1], 10, 64) + if err != nil { + return 0, fmt.Errorf("Could not parse MemAvailable: %v", b) + } + switch tri[2] { // convert units to bytes + case "kB": + c *= 1024 + case "MB": + c *= 1024 * 1024 + default: + return 0, fmt.Errorf("Unexpected units for MemAvailable in /proc/meminfo, need kB or MB, got: %v", tri[2]) + } + return c, nil + } + + return 0, fmt.Errorf("Didn't find MemAvailable in /proc/meminfo, kernel is probably < 3.14") +} diff --git a/api/runner/task.go b/api/runner/task.go index 724446d43..a62187fc8 100644 --- a/api/runner/task.go +++ b/api/runner/task.go @@ -3,14 +3,18 @@ package runner import ( "io" + "golang.org/x/net/context" + dockercli "github.com/fsouza/go-dockerclient" "github.com/iron-io/titan/runner/drivers" "github.com/iron-io/titan/runner/tasker" ) type containerTask struct { - auth tasker.Auther - cfg *Config + ctx context.Context + auth tasker.Auther + cfg *Config + canRun chan bool } func (t *containerTask) Command() string { return "" } diff --git a/api/server/runner.go b/api/server/runner.go index 5f9ff5b73..7a120dd17 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -108,10 +108,6 @@ func handleRunner(c *gin.Context) { log = log.WithFields(logrus.Fields{ "app": appName, "route": el.Path, "image": el.Image, "request_id": reqID}) - // Request count metric - metricBaseName := "server.handleRunner." + appName + "." - runner.LogMetricCount(ctx, (metricBaseName + "requests"), 1) - if params, match := matchRoute(el.Path, route); match { var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader @@ -152,9 +148,9 @@ func handleRunner(c *gin.Context) { Stdout: &stdout, Stderr: stderr, Env: envVars, + Memory: el.Memory, } - metricStart := time.Now() if result, err := Api.Runner.Run(c, cfg); err != nil { log.WithError(err).Error(models.ErrRunnerRunRoute) c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute)) @@ -165,19 +161,11 @@ func handleRunner(c *gin.Context) { if result.Status() == "success" { c.Data(http.StatusOK, "", stdout.Bytes()) - runner.LogMetricCount(ctx, (metricBaseName + "succeeded"), 1) - } else { - // log.WithFields(logrus.Fields{"app": appName, "route": el, "req_id": reqID}).Debug(stderr.String()) - // Error count metric - runner.LogMetricCount(ctx, (metricBaseName + "error"), 1) c.AbortWithStatus(http.StatusInternalServerError) } } - // Execution time metric - metricElapsed := time.Since(metricStart) - runner.LogMetricTime(ctx, (metricBaseName + "time"), metricElapsed) return } }