diff --git a/api/datastore/postgres/postgres.go b/api/datastore/postgres/postgres.go index 17df7086b..eb46f3cdc 100644 --- a/api/datastore/postgres/postgres.go +++ b/api/datastore/postgres/postgres.go @@ -16,6 +16,7 @@ CREATE TABLE IF NOT EXISTS routes ( app_name character varying(256) NOT NULL, path text NOT NULL, image character varying(256) NOT NULL, + memory integer NOT NULL, headers text NOT NULL, config text NOT NULL, PRIMARY KEY (app_name, path) @@ -31,7 +32,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras ( value character varying(256) NOT NULL );` -const routeSelector = `SELECT app_name, path, image, headers, config FROM routes` +const routeSelector = `SELECT app_name, path, image, memory, headers, config FROM routes` type rowScanner interface { Scan(dest ...interface{}) error @@ -182,6 +183,7 @@ func (ds *PostgresDatastore) StoreRoute(route *models.Route) (*models.Route, err app_name, path, image, + memory, headers, config ) @@ -189,12 +191,14 @@ func (ds *PostgresDatastore) StoreRoute(route *models.Route) (*models.Route, err ON CONFLICT (app_name, path) DO UPDATE SET path = $2, image = $3, - headers = $4, - config = $5; + memory = $4, + headers = $5, + config = $6; `, route.AppName, route.Path, route.Image, + route.Memory, string(hbyte), string(cbyte), ) @@ -225,6 +229,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error { &route.AppName, &route.Path, &route.Image, + &route.Memory, &headerStr, &configStr, ) diff --git a/api/models/route.go b/api/models/route.go index b83566290..e146311f3 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -24,6 +24,7 @@ type Route struct { AppName string `json:"appname,omitempty"` Path string `json:"path,omitempty"` Image string `json:"image,omitempty"` + Memory uint64 `json:"memory,omitempty"` Headers http.Header `json:"headers,omitempty"` Config `json:"config"` } @@ -43,6 +44,10 @@ func (r *Route) Validate() error { res = append(res, ErrRoutesValidationMissingImage) } + if r.Memory == 0 { + r.Memory = 128 + } + if r.AppName == "" { res = append(res, ErrRoutesValidationMissingAppName) } diff --git a/api/runner/metrics.go b/api/runner/metrics.go index 2d34a65bb..2ddceb3b6 100644 --- a/api/runner/metrics.go +++ b/api/runner/metrics.go @@ -8,20 +8,46 @@ import ( "golang.org/x/net/context" ) -func LogMetric(ctx context.Context, name string, metricType string, value interface{}) { +type Logger interface { + Log(context.Context, map[string]interface{}) + LogCount(context.Context, string, int) + LogGauge(context.Context, string, int) + LogTime(context.Context, string, time.Duration) +} + +type Metric map[string]interface{} + +func NewMetricLogger() *MetricLogger { + return &MetricLogger{} +} + +type MetricLogger struct{} + +func (l *MetricLogger) Log(ctx context.Context, metric map[string]interface{}) { log := titancommon.Logger(ctx) - log.WithFields(logrus.Fields{ - "metric": name, "type": metricType, "value": value}).Info() + log.WithFields(logrus.Fields(metric)).Info() } -func LogMetricGauge(ctx context.Context, name string, value int) { - LogMetric(ctx, name, "gauge", value) +func (l *MetricLogger) LogCount(ctx context.Context, name string, value int) { + l.Log(ctx, Metric{ + "name": name, + "value": value, + "type": "count", + }) } -func LogMetricCount(ctx context.Context, name string, value int) { - LogMetric(ctx, name, "count", value) +func (l *MetricLogger) LogTime(ctx context.Context, name string, value time.Duration) { + l.Log(ctx, Metric{ + "name": name, + "value": value, + "type": "time", + }) } -func LogMetricTime(ctx context.Context, name string, time time.Duration) { - LogMetric(ctx, name, "time", time) +func (l *MetricLogger) LogGauge(ctx context.Context, name string, value int) { + l.Log(ctx, Metric{ + "name": name, + "value": value, + "type": "gauge", + }) } diff --git a/api/runner/runner.go b/api/runner/runner.go index d640b2c4d..bfe26fccf 100644 --- a/api/runner/runner.go +++ b/api/runner/runner.go @@ -1,12 +1,20 @@ package runner import ( + "bufio" + "errors" "fmt" "io" + "io/ioutil" + "os" + "strconv" + "strings" + "sync" "time" "golang.org/x/net/context" + "github.com/Sirupsen/logrus" "github.com/iron-io/titan/common" "github.com/iron-io/titan/runner/agent" "github.com/iron-io/titan/runner/drivers" @@ -20,16 +28,29 @@ type Config struct { Image string Timeout time.Duration AppName string + Memory uint64 Env map[string]string Stdout io.Writer Stderr io.Writer } type Runner struct { - driver drivers.Driver + driver drivers.Driver + taskQueue chan *containerTask + ml Logger + availableMem int64 + usedMem int64 + usedMemMutex sync.RWMutex } -func New() (*Runner, error) { +var ( + ErrTimeOutNoMemory = errors.New("Task timed out. No available memory.") + ErrFullQueue = errors.New("The runner queue is full") + + WaitMemoryTimeout = 10 * time.Second +) + +func New(metricLogger Logger) (*Runner, error) { // TODO: Is this really required for Titan's driver? // Can we remove it? env := common.NewEnvironment(func(e *common.Environment) {}) @@ -40,17 +61,127 @@ func New() (*Runner, error) { return nil, err } - return &Runner{ - driver: driver, - }, nil + r := &Runner{ + driver: driver, + taskQueue: make(chan *containerTask, 100), + ml: metricLogger, + availableMem: getAvailableMemory(), + usedMem: 0, + } + + go r.queueHandler() + + return r, nil +} + +// This routine checks for available memory; +// If there's memory then send signal to the task to proceed. +// If there's not available memory to run the task it waits +// If the task waits for more than X seconds it timeouts +func (r *Runner) queueHandler() { + var task *containerTask + var waitStart time.Time + var waitTime time.Duration + var timedOut bool + for { + select { + case task = <-r.taskQueue: + waitStart = time.Now() + timedOut = false + } + + // Loop waiting for available memory + canRun := r.checkRequiredMem(task.cfg.Memory) + for ; !canRun; canRun = r.checkRequiredMem(task.cfg.Memory) { + waitTime = time.Since(waitStart) + if waitTime > WaitMemoryTimeout { + timedOut = true + break + } + time.Sleep(time.Microsecond) + } + + metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName) + r.ml.LogTime(task.ctx, metricBaseName+"waittime", waitTime) + + if timedOut { + // Send to a signal to this task saying it cannot run + r.ml.LogCount(task.ctx, metricBaseName+"timeout", 1) + task.canRun <- false + continue + } + + // Send a signal to this task saying it can run + task.canRun <- true + } +} + +func (r *Runner) checkRequiredMem(req uint64) bool { + r.usedMemMutex.RLock() + defer r.usedMemMutex.RUnlock() + return r.availableMem-r.usedMem/int64(req)*1024*1024 > 0 +} + +func (r *Runner) addUsedMem(used int64) { + r.usedMemMutex.Lock() + r.usedMem = r.usedMem + used*1024*1024 + if r.usedMem < 0 { + r.usedMem = 0 + } + r.usedMemMutex.Unlock() +} + +func (r *Runner) checkMemAndUse(req uint64) bool { + r.usedMemMutex.Lock() + defer r.usedMemMutex.Unlock() + + used := int64(req) * 1024 * 1024 + + if r.availableMem-r.usedMem/used < 0 { + return false + } + + r.usedMem += used + + return true } func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error) { var err error + if cfg.Memory == 0 { + cfg.Memory = 128 + } + ctask := &containerTask{ - cfg: cfg, - auth: &agent.ConfigAuth{}, + ctx: ctx, + cfg: cfg, + auth: &agent.ConfigAuth{}, + canRun: make(chan bool), + } + + metricBaseName := fmt.Sprintf("run.%s.", cfg.AppName) + r.ml.LogCount(ctx, metricBaseName+"requests", 1) + + // Check if has enough available memory + // If available, use it + if !r.checkMemAndUse(cfg.Memory) { + // If not, try add task to the queue + select { + case r.taskQueue <- ctask: + default: + // If queue is full, return error + r.ml.LogCount(ctx, "queue.full", 1) + return nil, ErrFullQueue + } + + // If task was added to the queue, wait for permission + if ok := <-ctask.canRun; !ok { + // This task timed out, not available memory + return nil, ErrTimeOutNoMemory + } + } else { + r.ml.LogTime(ctx, metricBaseName+"waittime", 0) } closer, err := r.driver.Prepare(ctx, ctask) @@ -59,11 +190,25 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error } defer closer.Close() + metricStart := time.Now() + result, err := r.driver.Run(ctx, ctask) if err != nil { return nil, err } + r.addUsedMem(-1 * int64(cfg.Memory)) + + if result.Status() == "success" { + r.ml.LogCount(ctx, metricBaseName+"succeeded", 1) + } else { + r.ml.LogCount(ctx, metricBaseName+"error", 1) + } + + metricElapsed := time.Since(metricStart) + r.ml.LogTime(ctx, metricBaseName+"time", metricElapsed) + r.ml.LogTime(ctx, "run.exec_time", metricElapsed) + return result, nil } @@ -90,3 +235,80 @@ func selectDriver(driver string, env *common.Environment, conf *driverscommon.Co } return nil, fmt.Errorf("driver %v not found", driver) } + +func getAvailableMemory() int64 { + const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB + + var availableMemory uint64 + if os.Getenv("IGNORE_MEMORY") == "1" { + availableMemory = tooBig + } else { + availableMemory, err := checkCgroup() + if err != nil { + logrus.WithError(err).Error("Error checking for cgroup memory limits, falling back to host memory available..") + } + if availableMemory > tooBig || availableMemory == 0 { + // Then -m flag probably wasn't set, so use max available on system + availableMemory, err = checkProc() + if availableMemory > tooBig || availableMemory == 0 { + logrus.WithError(err).Fatal("Your Linux version is too old (<3.14) then we can't get the proper information to . You must specify the maximum available memory by passing the -m command with docker run when starting the runner via docker, eg: `docker run -m 2G ...`") + } + } + } + + return int64(availableMemory) +} + +func checkCgroup() (uint64, error) { + f, err := os.Open("/sys/fs/cgroup/memory/memory.limit_in_bytes") + if err != nil { + return 0, err + } + defer f.Close() + b, err := ioutil.ReadAll(f) + limBytes := string(b) + limBytes = strings.TrimSpace(limBytes) + if err != nil { + return 0, err + } + return strconv.ParseUint(limBytes, 10, 64) +} + +func checkProc() (uint64, error) { + f, err := os.Open("/proc/meminfo") + if err != nil { + return 0, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + b := scanner.Text() + if !strings.HasPrefix(b, "MemAvailable") { + continue + } + + // expect form: + // MemAvailable: 1234567890 kB + tri := strings.Fields(b) + if len(tri) != 3 { + return 0, fmt.Errorf("MemAvailable line has unexpected format: %v", b) + } + + c, err := strconv.ParseUint(tri[1], 10, 64) + if err != nil { + return 0, fmt.Errorf("Could not parse MemAvailable: %v", b) + } + switch tri[2] { // convert units to bytes + case "kB": + c *= 1024 + case "MB": + c *= 1024 * 1024 + default: + return 0, fmt.Errorf("Unexpected units for MemAvailable in /proc/meminfo, need kB or MB, got: %v", tri[2]) + } + return c, nil + } + + return 0, fmt.Errorf("Didn't find MemAvailable in /proc/meminfo, kernel is probably < 3.14") +} diff --git a/api/runner/runner_test.go b/api/runner/runner_test.go index 6f47df260..45b9858e6 100644 --- a/api/runner/runner_test.go +++ b/api/runner/runner_test.go @@ -11,7 +11,7 @@ import ( ) func TestRunnerHello(t *testing.T) { - runner, err := New() + runner, err := New(NewMetricLogger()) if err != nil { t.Fatalf("Test error during New() - %s", err) } @@ -60,7 +60,7 @@ func TestRunnerHello(t *testing.T) { } func TestRunnerError(t *testing.T) { - runner, err := New() + runner, err := New(NewMetricLogger()) if err != nil { t.Fatalf("Test error during New() - %s", err) } diff --git a/api/runner/task.go b/api/runner/task.go index 724446d43..a62187fc8 100644 --- a/api/runner/task.go +++ b/api/runner/task.go @@ -3,14 +3,18 @@ package runner import ( "io" + "golang.org/x/net/context" + dockercli "github.com/fsouza/go-dockerclient" "github.com/iron-io/titan/runner/drivers" "github.com/iron-io/titan/runner/tasker" ) type containerTask struct { - auth tasker.Auther - cfg *Config + ctx context.Context + auth tasker.Auther + cfg *Config + canRun chan bool } func (t *containerTask) Command() string { return "" } diff --git a/api/server/helpers.go b/api/server/helpers.go index 561c6f9a1..863f4748b 100644 --- a/api/server/helpers.go +++ b/api/server/helpers.go @@ -49,7 +49,7 @@ func testRouter() *gin.Engine { } func testRunner(t *testing.T) *runner.Runner { - r, err := runner.New() + r, err := runner.New(runner.NewMetricLogger()) if err != nil { t.Fatal("Test: failed to create new runner") } diff --git a/api/server/runner.go b/api/server/runner.go index aa95f60e4..7a120dd17 100644 --- a/api/server/runner.go +++ b/api/server/runner.go @@ -108,10 +108,6 @@ func handleRunner(c *gin.Context) { log = log.WithFields(logrus.Fields{ "app": appName, "route": el.Path, "image": el.Image, "request_id": reqID}) - // Request count metric - metricBaseName := "server.handleRunner." + appName + "." - runner.LogMetricCount(ctx, (metricBaseName + "requests"), 1) - if params, match := matchRoute(el.Path, route); match { var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader @@ -152,9 +148,9 @@ func handleRunner(c *gin.Context) { Stdout: &stdout, Stderr: stderr, Env: envVars, + Memory: el.Memory, } - metricStart := time.Now() if result, err := Api.Runner.Run(c, cfg); err != nil { log.WithError(err).Error(models.ErrRunnerRunRoute) c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute)) @@ -165,20 +161,11 @@ func handleRunner(c *gin.Context) { if result.Status() == "success" { c.Data(http.StatusOK, "", stdout.Bytes()) - runner.LogMetricCount(ctx, (metricBaseName + "succeeded"), 1) - } else { - // log.WithFields(logrus.Fields{"app": appName, "route": el, "req_id": reqID}).Debug(stderr.String()) - // Error count metric - runner.LogMetricCount(ctx, (metricBaseName + "error"), 1) c.AbortWithStatus(http.StatusInternalServerError) } } - // Execution time metric - metricElapsed := time.Since(metricStart) - runner.LogMetricTime(ctx, (metricBaseName + "time"), metricElapsed) - runner.LogMetricTime(ctx, "server.handleRunner.exec_time", metricElapsed) return } } diff --git a/circle.yml b/circle.yml index 19bbe43d1..31a568632 100644 --- a/circle.yml +++ b/circle.yml @@ -1,18 +1,14 @@ machine: environment: - GOPATH: $HOME/go - GOROOT: $HOME/go - PATH: $GOROOT/bin:$HOME/bin:$PATH - GO15VENDOREXPERIMENT: 1 CHECKOUT_DIR: $HOME/$CIRCLE_PROJECT_REPONAME - GH_IRON: $GOROOT/src/github.com/iron-io + GOPATH: $HOME/go + GH_IRON: $GOPATH/src/github.com/iron-io GO_PROJECT: ../go/src/github.com/iron-io/$CIRCLE_PROJECT_REPONAME services: - docker checkout: post: - - echo "$GH_IRON" - mkdir -p "$GH_IRON" - cp -R "$CHECKOUT_DIR" "$GH_IRON/$CIRCLE_PROJECT_REPONAME" @@ -30,6 +26,5 @@ dependencies: test: override: - # Test entire project without diving into vendor dir. - - go test -v ./api/...: - pwd: $GO_PROJECT \ No newline at end of file + - ./test.sh: + pwd: $GO_PROJECT diff --git a/glide.lock b/glide.lock index 48c05c951..ab6e8e3f4 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 5ccf89905e13b7dc987cd203c1d7c1fddd32dcd776fe2e5f60d9f6f7b9908425 -updated: 2016-08-30T10:32:52.850687756-07:00 +hash: 3681d7248a9e90a7540f709e4844bbac6ae98806f0bdeb2f2945616655f78ad6 +updated: 2016-09-12T12:38:41.400655672-03:00 imports: - name: github.com/amir/raidman version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985 @@ -25,22 +25,22 @@ imports: - pkg/archive - pkg/fileutils - pkg/homedir - - pkg/ioutils + - pkg/stdcopy - pkg/parsers + - pkg/ulimit + - volume - pkg/pools - pkg/promise - - pkg/random - - pkg/stdcopy - pkg/system - - pkg/tarsum - - pkg/ulimit + - pkg/ioutils - pkg/units - - volume + - pkg/tarsum + - pkg/random - name: github.com/docker/engine-api version: 2f8c367944a28130f3c2fb9f0aad7f1d5db952a9 subpackages: - - types/mount - types/swarm + - types/mount - name: github.com/docker/go-units version: eb879ae3e2b84e2a142af415b679ddeda47ec71c - name: github.com/docker/libtrust @@ -52,8 +52,8 @@ imports: - name: github.com/garyburd/redigo version: 4ed1111375cbeb698249ffe48dd463e9b0a63a7a subpackages: - - internal - redis + - internal - name: github.com/gin-gonic/gin version: 4a6bc4aac4607e253bcda67c8c5bcda693d2388e subpackages: @@ -92,10 +92,10 @@ imports: subpackages: - hcl/ast - hcl/parser - - hcl/scanner - - hcl/strconv - hcl/token - json/parser + - hcl/scanner + - hcl/strconv - json/scanner - json/token - name: github.com/heroku/docker-registry-client @@ -104,20 +104,20 @@ imports: - registry - name: github.com/iron-io/titan version: 3bb6aacb244a24e38bba755cc863533810117a5c - repo: https://github.com/iron-io/titan.git + repo: git@github.com:iron-io/worker.git vcs: git subpackages: - common - - common/stats - runner/agent - runner/drivers - runner/drivers/docker - runner/drivers/mock - runner/tasker + - common/stats - runner/tasker/client/models - runner/tasker/client/titan - - runner/tasker/client/titan/groups - runner/tasker/client/titan/jobs + - runner/tasker/client/titan/groups - runner/tasker/client/titan/runner - name: github.com/kr/fs version: 2788f0dbd16903de03cb8186e5c7d97b69ad387b @@ -130,9 +130,9 @@ imports: - name: github.com/mailru/easyjson version: 34560e358dc05e2c28f6fda2f5c9e7494a4b9b19 subpackages: - - buffer - jlexer - jwriter + - buffer - name: github.com/manucorporat/sse version: ee05b128a739a0fb76c7ebd3ae4810c1de808d6d - name: github.com/mitchellh/mapstructure @@ -175,19 +175,18 @@ imports: - name: github.com/spf13/viper version: 7fb2782df3d83e0036cc89f461ed0422628776f4 - name: golang.org/x/crypto - version: b13fc1fd382d01861b16b2e6474487d3d4d27f20 + version: c10c31b5e94b6f7a0283272dc2bb27163dcea24b subpackages: + - ssh - curve25519 - ed25519 - - ed25519/internal/edwards25519 - - ssh - name: golang.org/x/net version: f315505cf3349909cdf013ea56690da34e96a451 subpackages: - context - context/ctxhttp - - idna - proxy + - idna - name: golang.org/x/sys version: a646d33e2ee3172a661fc09bca23bb4889a41bc8 subpackages: @@ -195,16 +194,16 @@ imports: - name: golang.org/x/text version: d69c40b4be55797923cec7457fac7a244d91a9b6 subpackages: + - transform + - unicode/norm + - secure/precis - cases - - internal/tag - - language - runes - secure/bidirule - - secure/precis - - transform - - unicode/bidi - - unicode/norm - width + - language + - unicode/bidi + - internal/tag - name: gopkg.in/go-playground/validator.v8 version: c193cecd124b5cc722d7ee5538e945bdb3348435 - name: gopkg.in/yaml.v2 diff --git a/glide.yaml b/glide.yaml index d37e13af1..cfa53498f 100644 --- a/glide.yaml +++ b/glide.yaml @@ -7,7 +7,7 @@ import: - package: github.com/go-openapi/strfmt - package: github.com/go-openapi/validate - package: github.com/iron-io/titan - repo: https://github.com/iron-io/titan.git + repo: git@github.com:iron-io/worker.git vcs: git version: master - package: github.com/lib/pq diff --git a/main.go b/main.go index 3969aee38..f396ff858 100644 --- a/main.go +++ b/main.go @@ -20,7 +20,8 @@ func main() { log.WithError(err).Fatalln("Invalid DB url.") } - runner, err := runner.New() + metricLogger := runner.NewMetricLogger() + runner, err := runner.New(metricLogger) if err != nil { log.WithError(err).Fatalln("Failed to create a runner") } diff --git a/test.sh b/test.sh new file mode 100755 index 000000000..d090503f3 --- /dev/null +++ b/test.sh @@ -0,0 +1,7 @@ +export GO15VENDOREXPERIMENT=1 + +docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e IGNORE_MEMORY=1 \ + -e LOG_LEVEL=debug \ + -e GOPATH="$PWD/../../../.." \ + -v "$PWD":"$PWD" -w "$PWD" iron/go-dind go test -v $(go list ./... | grep -v /vendor/ | grep -v /examples/) \ No newline at end of file