Merge pull request #93 from pedronasser/runner-queue

Runner queue
This commit is contained in:
Pedro Nasser
2016-09-17 12:20:26 -03:00
committed by GitHub
13 changed files with 326 additions and 75 deletions

View File

@@ -16,6 +16,7 @@ CREATE TABLE IF NOT EXISTS routes (
app_name character varying(256) NOT NULL,
path text NOT NULL,
image character varying(256) NOT NULL,
memory integer NOT NULL,
headers text NOT NULL,
config text NOT NULL,
PRIMARY KEY (app_name, path)
@@ -31,7 +32,7 @@ const extrasTableCreate = `CREATE TABLE IF NOT EXISTS extras (
value character varying(256) NOT NULL
);`
const routeSelector = `SELECT app_name, path, image, headers, config FROM routes`
const routeSelector = `SELECT app_name, path, image, memory, headers, config FROM routes`
type rowScanner interface {
Scan(dest ...interface{}) error
@@ -182,6 +183,7 @@ func (ds *PostgresDatastore) StoreRoute(route *models.Route) (*models.Route, err
app_name,
path,
image,
memory,
headers,
config
)
@@ -189,12 +191,14 @@ func (ds *PostgresDatastore) StoreRoute(route *models.Route) (*models.Route, err
ON CONFLICT (app_name, path) DO UPDATE SET
path = $2,
image = $3,
headers = $4,
config = $5;
memory = $4,
headers = $5,
config = $6;
`,
route.AppName,
route.Path,
route.Image,
route.Memory,
string(hbyte),
string(cbyte),
)
@@ -225,6 +229,7 @@ func scanRoute(scanner rowScanner, route *models.Route) error {
&route.AppName,
&route.Path,
&route.Image,
&route.Memory,
&headerStr,
&configStr,
)

View File

@@ -24,6 +24,7 @@ type Route struct {
AppName string `json:"appname,omitempty"`
Path string `json:"path,omitempty"`
Image string `json:"image,omitempty"`
Memory uint64 `json:"memory,omitempty"`
Headers http.Header `json:"headers,omitempty"`
Config `json:"config"`
}
@@ -43,6 +44,10 @@ func (r *Route) Validate() error {
res = append(res, ErrRoutesValidationMissingImage)
}
if r.Memory == 0 {
r.Memory = 128
}
if r.AppName == "" {
res = append(res, ErrRoutesValidationMissingAppName)
}

View File

@@ -8,20 +8,46 @@ import (
"golang.org/x/net/context"
)
func LogMetric(ctx context.Context, name string, metricType string, value interface{}) {
type Logger interface {
Log(context.Context, map[string]interface{})
LogCount(context.Context, string, int)
LogGauge(context.Context, string, int)
LogTime(context.Context, string, time.Duration)
}
type Metric map[string]interface{}
func NewMetricLogger() *MetricLogger {
return &MetricLogger{}
}
type MetricLogger struct{}
func (l *MetricLogger) Log(ctx context.Context, metric map[string]interface{}) {
log := titancommon.Logger(ctx)
log.WithFields(logrus.Fields{
"metric": name, "type": metricType, "value": value}).Info()
log.WithFields(logrus.Fields(metric)).Info()
}
func LogMetricGauge(ctx context.Context, name string, value int) {
LogMetric(ctx, name, "gauge", value)
func (l *MetricLogger) LogCount(ctx context.Context, name string, value int) {
l.Log(ctx, Metric{
"name": name,
"value": value,
"type": "count",
})
}
func LogMetricCount(ctx context.Context, name string, value int) {
LogMetric(ctx, name, "count", value)
func (l *MetricLogger) LogTime(ctx context.Context, name string, value time.Duration) {
l.Log(ctx, Metric{
"name": name,
"value": value,
"type": "time",
})
}
func LogMetricTime(ctx context.Context, name string, time time.Duration) {
LogMetric(ctx, name, "time", time)
func (l *MetricLogger) LogGauge(ctx context.Context, name string, value int) {
l.Log(ctx, Metric{
"name": name,
"value": value,
"type": "gauge",
})
}

View File

@@ -1,12 +1,20 @@
package runner
import (
"bufio"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/iron-io/titan/common"
"github.com/iron-io/titan/runner/agent"
"github.com/iron-io/titan/runner/drivers"
@@ -20,16 +28,29 @@ type Config struct {
Image string
Timeout time.Duration
AppName string
Memory uint64
Env map[string]string
Stdout io.Writer
Stderr io.Writer
}
type Runner struct {
driver drivers.Driver
driver drivers.Driver
taskQueue chan *containerTask
ml Logger
availableMem int64
usedMem int64
usedMemMutex sync.RWMutex
}
func New() (*Runner, error) {
var (
ErrTimeOutNoMemory = errors.New("Task timed out. No available memory.")
ErrFullQueue = errors.New("The runner queue is full")
WaitMemoryTimeout = 10 * time.Second
)
func New(metricLogger Logger) (*Runner, error) {
// TODO: Is this really required for Titan's driver?
// Can we remove it?
env := common.NewEnvironment(func(e *common.Environment) {})
@@ -40,17 +61,127 @@ func New() (*Runner, error) {
return nil, err
}
return &Runner{
driver: driver,
}, nil
r := &Runner{
driver: driver,
taskQueue: make(chan *containerTask, 100),
ml: metricLogger,
availableMem: getAvailableMemory(),
usedMem: 0,
}
go r.queueHandler()
return r, nil
}
// This routine checks for available memory;
// If there's memory then send signal to the task to proceed.
// If there's not available memory to run the task it waits
// If the task waits for more than X seconds it timeouts
func (r *Runner) queueHandler() {
var task *containerTask
var waitStart time.Time
var waitTime time.Duration
var timedOut bool
for {
select {
case task = <-r.taskQueue:
waitStart = time.Now()
timedOut = false
}
// Loop waiting for available memory
canRun := r.checkRequiredMem(task.cfg.Memory)
for ; !canRun; canRun = r.checkRequiredMem(task.cfg.Memory) {
waitTime = time.Since(waitStart)
if waitTime > WaitMemoryTimeout {
timedOut = true
break
}
time.Sleep(time.Microsecond)
}
metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName)
r.ml.LogTime(task.ctx, metricBaseName+"waittime", waitTime)
if timedOut {
// Send to a signal to this task saying it cannot run
r.ml.LogCount(task.ctx, metricBaseName+"timeout", 1)
task.canRun <- false
continue
}
// Send a signal to this task saying it can run
task.canRun <- true
}
}
func (r *Runner) checkRequiredMem(req uint64) bool {
r.usedMemMutex.RLock()
defer r.usedMemMutex.RUnlock()
return r.availableMem-r.usedMem/int64(req)*1024*1024 > 0
}
func (r *Runner) addUsedMem(used int64) {
r.usedMemMutex.Lock()
r.usedMem = r.usedMem + used*1024*1024
if r.usedMem < 0 {
r.usedMem = 0
}
r.usedMemMutex.Unlock()
}
func (r *Runner) checkMemAndUse(req uint64) bool {
r.usedMemMutex.Lock()
defer r.usedMemMutex.Unlock()
used := int64(req) * 1024 * 1024
if r.availableMem-r.usedMem/used < 0 {
return false
}
r.usedMem += used
return true
}
func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error) {
var err error
if cfg.Memory == 0 {
cfg.Memory = 128
}
ctask := &containerTask{
cfg: cfg,
auth: &agent.ConfigAuth{},
ctx: ctx,
cfg: cfg,
auth: &agent.ConfigAuth{},
canRun: make(chan bool),
}
metricBaseName := fmt.Sprintf("run.%s.", cfg.AppName)
r.ml.LogCount(ctx, metricBaseName+"requests", 1)
// Check if has enough available memory
// If available, use it
if !r.checkMemAndUse(cfg.Memory) {
// If not, try add task to the queue
select {
case r.taskQueue <- ctask:
default:
// If queue is full, return error
r.ml.LogCount(ctx, "queue.full", 1)
return nil, ErrFullQueue
}
// If task was added to the queue, wait for permission
if ok := <-ctask.canRun; !ok {
// This task timed out, not available memory
return nil, ErrTimeOutNoMemory
}
} else {
r.ml.LogTime(ctx, metricBaseName+"waittime", 0)
}
closer, err := r.driver.Prepare(ctx, ctask)
@@ -59,11 +190,25 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
}
defer closer.Close()
metricStart := time.Now()
result, err := r.driver.Run(ctx, ctask)
if err != nil {
return nil, err
}
r.addUsedMem(-1 * int64(cfg.Memory))
if result.Status() == "success" {
r.ml.LogCount(ctx, metricBaseName+"succeeded", 1)
} else {
r.ml.LogCount(ctx, metricBaseName+"error", 1)
}
metricElapsed := time.Since(metricStart)
r.ml.LogTime(ctx, metricBaseName+"time", metricElapsed)
r.ml.LogTime(ctx, "run.exec_time", metricElapsed)
return result, nil
}
@@ -90,3 +235,80 @@ func selectDriver(driver string, env *common.Environment, conf *driverscommon.Co
}
return nil, fmt.Errorf("driver %v not found", driver)
}
func getAvailableMemory() int64 {
const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB
var availableMemory uint64
if os.Getenv("IGNORE_MEMORY") == "1" {
availableMemory = tooBig
} else {
availableMemory, err := checkCgroup()
if err != nil {
logrus.WithError(err).Error("Error checking for cgroup memory limits, falling back to host memory available..")
}
if availableMemory > tooBig || availableMemory == 0 {
// Then -m flag probably wasn't set, so use max available on system
availableMemory, err = checkProc()
if availableMemory > tooBig || availableMemory == 0 {
logrus.WithError(err).Fatal("Your Linux version is too old (<3.14) then we can't get the proper information to . You must specify the maximum available memory by passing the -m command with docker run when starting the runner via docker, eg: `docker run -m 2G ...`")
}
}
}
return int64(availableMemory)
}
func checkCgroup() (uint64, error) {
f, err := os.Open("/sys/fs/cgroup/memory/memory.limit_in_bytes")
if err != nil {
return 0, err
}
defer f.Close()
b, err := ioutil.ReadAll(f)
limBytes := string(b)
limBytes = strings.TrimSpace(limBytes)
if err != nil {
return 0, err
}
return strconv.ParseUint(limBytes, 10, 64)
}
func checkProc() (uint64, error) {
f, err := os.Open("/proc/meminfo")
if err != nil {
return 0, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
b := scanner.Text()
if !strings.HasPrefix(b, "MemAvailable") {
continue
}
// expect form:
// MemAvailable: 1234567890 kB
tri := strings.Fields(b)
if len(tri) != 3 {
return 0, fmt.Errorf("MemAvailable line has unexpected format: %v", b)
}
c, err := strconv.ParseUint(tri[1], 10, 64)
if err != nil {
return 0, fmt.Errorf("Could not parse MemAvailable: %v", b)
}
switch tri[2] { // convert units to bytes
case "kB":
c *= 1024
case "MB":
c *= 1024 * 1024
default:
return 0, fmt.Errorf("Unexpected units for MemAvailable in /proc/meminfo, need kB or MB, got: %v", tri[2])
}
return c, nil
}
return 0, fmt.Errorf("Didn't find MemAvailable in /proc/meminfo, kernel is probably < 3.14")
}

View File

@@ -11,7 +11,7 @@ import (
)
func TestRunnerHello(t *testing.T) {
runner, err := New()
runner, err := New(NewMetricLogger())
if err != nil {
t.Fatalf("Test error during New() - %s", err)
}
@@ -60,7 +60,7 @@ func TestRunnerHello(t *testing.T) {
}
func TestRunnerError(t *testing.T) {
runner, err := New()
runner, err := New(NewMetricLogger())
if err != nil {
t.Fatalf("Test error during New() - %s", err)
}

View File

@@ -3,14 +3,18 @@ package runner
import (
"io"
"golang.org/x/net/context"
dockercli "github.com/fsouza/go-dockerclient"
"github.com/iron-io/titan/runner/drivers"
"github.com/iron-io/titan/runner/tasker"
)
type containerTask struct {
auth tasker.Auther
cfg *Config
ctx context.Context
auth tasker.Auther
cfg *Config
canRun chan bool
}
func (t *containerTask) Command() string { return "" }

View File

@@ -49,7 +49,7 @@ func testRouter() *gin.Engine {
}
func testRunner(t *testing.T) *runner.Runner {
r, err := runner.New()
r, err := runner.New(runner.NewMetricLogger())
if err != nil {
t.Fatal("Test: failed to create new runner")
}

View File

@@ -108,10 +108,6 @@ func handleRunner(c *gin.Context) {
log = log.WithFields(logrus.Fields{
"app": appName, "route": el.Path, "image": el.Image, "request_id": reqID})
// Request count metric
metricBaseName := "server.handleRunner." + appName + "."
runner.LogMetricCount(ctx, (metricBaseName + "requests"), 1)
if params, match := matchRoute(el.Path, route); match {
var stdout bytes.Buffer // TODO: should limit the size of this, error if gets too big. akin to: https://golang.org/pkg/io/#LimitReader
@@ -152,9 +148,9 @@ func handleRunner(c *gin.Context) {
Stdout: &stdout,
Stderr: stderr,
Env: envVars,
Memory: el.Memory,
}
metricStart := time.Now()
if result, err := Api.Runner.Run(c, cfg); err != nil {
log.WithError(err).Error(models.ErrRunnerRunRoute)
c.JSON(http.StatusInternalServerError, simpleError(models.ErrRunnerRunRoute))
@@ -165,20 +161,11 @@ func handleRunner(c *gin.Context) {
if result.Status() == "success" {
c.Data(http.StatusOK, "", stdout.Bytes())
runner.LogMetricCount(ctx, (metricBaseName + "succeeded"), 1)
} else {
// log.WithFields(logrus.Fields{"app": appName, "route": el, "req_id": reqID}).Debug(stderr.String())
// Error count metric
runner.LogMetricCount(ctx, (metricBaseName + "error"), 1)
c.AbortWithStatus(http.StatusInternalServerError)
}
}
// Execution time metric
metricElapsed := time.Since(metricStart)
runner.LogMetricTime(ctx, (metricBaseName + "time"), metricElapsed)
runner.LogMetricTime(ctx, "server.handleRunner.exec_time", metricElapsed)
return
}
}

View File

@@ -1,18 +1,14 @@
machine:
environment:
GOPATH: $HOME/go
GOROOT: $HOME/go
PATH: $GOROOT/bin:$HOME/bin:$PATH
GO15VENDOREXPERIMENT: 1
CHECKOUT_DIR: $HOME/$CIRCLE_PROJECT_REPONAME
GH_IRON: $GOROOT/src/github.com/iron-io
GOPATH: $HOME/go
GH_IRON: $GOPATH/src/github.com/iron-io
GO_PROJECT: ../go/src/github.com/iron-io/$CIRCLE_PROJECT_REPONAME
services:
- docker
checkout:
post:
- echo "$GH_IRON"
- mkdir -p "$GH_IRON"
- cp -R "$CHECKOUT_DIR" "$GH_IRON/$CIRCLE_PROJECT_REPONAME"
@@ -30,6 +26,5 @@ dependencies:
test:
override:
# Test entire project without diving into vendor dir.
- go test -v ./api/...:
pwd: $GO_PROJECT
- ./test.sh:
pwd: $GO_PROJECT

51
glide.lock generated
View File

@@ -1,5 +1,5 @@
hash: 5ccf89905e13b7dc987cd203c1d7c1fddd32dcd776fe2e5f60d9f6f7b9908425
updated: 2016-08-30T10:32:52.850687756-07:00
hash: 3681d7248a9e90a7540f709e4844bbac6ae98806f0bdeb2f2945616655f78ad6
updated: 2016-09-12T12:38:41.400655672-03:00
imports:
- name: github.com/amir/raidman
version: c74861fe6a7bb8ede0a010ce4485bdbb4fc4c985
@@ -25,22 +25,22 @@ imports:
- pkg/archive
- pkg/fileutils
- pkg/homedir
- pkg/ioutils
- pkg/stdcopy
- pkg/parsers
- pkg/ulimit
- volume
- pkg/pools
- pkg/promise
- pkg/random
- pkg/stdcopy
- pkg/system
- pkg/tarsum
- pkg/ulimit
- pkg/ioutils
- pkg/units
- volume
- pkg/tarsum
- pkg/random
- name: github.com/docker/engine-api
version: 2f8c367944a28130f3c2fb9f0aad7f1d5db952a9
subpackages:
- types/mount
- types/swarm
- types/mount
- name: github.com/docker/go-units
version: eb879ae3e2b84e2a142af415b679ddeda47ec71c
- name: github.com/docker/libtrust
@@ -52,8 +52,8 @@ imports:
- name: github.com/garyburd/redigo
version: 4ed1111375cbeb698249ffe48dd463e9b0a63a7a
subpackages:
- internal
- redis
- internal
- name: github.com/gin-gonic/gin
version: 4a6bc4aac4607e253bcda67c8c5bcda693d2388e
subpackages:
@@ -92,10 +92,10 @@ imports:
subpackages:
- hcl/ast
- hcl/parser
- hcl/scanner
- hcl/strconv
- hcl/token
- json/parser
- hcl/scanner
- hcl/strconv
- json/scanner
- json/token
- name: github.com/heroku/docker-registry-client
@@ -104,20 +104,20 @@ imports:
- registry
- name: github.com/iron-io/titan
version: 3bb6aacb244a24e38bba755cc863533810117a5c
repo: https://github.com/iron-io/titan.git
repo: git@github.com:iron-io/worker.git
vcs: git
subpackages:
- common
- common/stats
- runner/agent
- runner/drivers
- runner/drivers/docker
- runner/drivers/mock
- runner/tasker
- common/stats
- runner/tasker/client/models
- runner/tasker/client/titan
- runner/tasker/client/titan/groups
- runner/tasker/client/titan/jobs
- runner/tasker/client/titan/groups
- runner/tasker/client/titan/runner
- name: github.com/kr/fs
version: 2788f0dbd16903de03cb8186e5c7d97b69ad387b
@@ -130,9 +130,9 @@ imports:
- name: github.com/mailru/easyjson
version: 34560e358dc05e2c28f6fda2f5c9e7494a4b9b19
subpackages:
- buffer
- jlexer
- jwriter
- buffer
- name: github.com/manucorporat/sse
version: ee05b128a739a0fb76c7ebd3ae4810c1de808d6d
- name: github.com/mitchellh/mapstructure
@@ -175,19 +175,18 @@ imports:
- name: github.com/spf13/viper
version: 7fb2782df3d83e0036cc89f461ed0422628776f4
- name: golang.org/x/crypto
version: b13fc1fd382d01861b16b2e6474487d3d4d27f20
version: c10c31b5e94b6f7a0283272dc2bb27163dcea24b
subpackages:
- ssh
- curve25519
- ed25519
- ed25519/internal/edwards25519
- ssh
- name: golang.org/x/net
version: f315505cf3349909cdf013ea56690da34e96a451
subpackages:
- context
- context/ctxhttp
- idna
- proxy
- idna
- name: golang.org/x/sys
version: a646d33e2ee3172a661fc09bca23bb4889a41bc8
subpackages:
@@ -195,16 +194,16 @@ imports:
- name: golang.org/x/text
version: d69c40b4be55797923cec7457fac7a244d91a9b6
subpackages:
- transform
- unicode/norm
- secure/precis
- cases
- internal/tag
- language
- runes
- secure/bidirule
- secure/precis
- transform
- unicode/bidi
- unicode/norm
- width
- language
- unicode/bidi
- internal/tag
- name: gopkg.in/go-playground/validator.v8
version: c193cecd124b5cc722d7ee5538e945bdb3348435
- name: gopkg.in/yaml.v2

View File

@@ -7,7 +7,7 @@ import:
- package: github.com/go-openapi/strfmt
- package: github.com/go-openapi/validate
- package: github.com/iron-io/titan
repo: https://github.com/iron-io/titan.git
repo: git@github.com:iron-io/worker.git
vcs: git
version: master
- package: github.com/lib/pq

View File

@@ -20,7 +20,8 @@ func main() {
log.WithError(err).Fatalln("Invalid DB url.")
}
runner, err := runner.New()
metricLogger := runner.NewMetricLogger()
runner, err := runner.New(metricLogger)
if err != nil {
log.WithError(err).Fatalln("Failed to create a runner")
}

7
test.sh Executable file
View File

@@ -0,0 +1,7 @@
export GO15VENDOREXPERIMENT=1
docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \
-e IGNORE_MEMORY=1 \
-e LOG_LEVEL=debug \
-e GOPATH="$PWD/../../../.." \
-v "$PWD":"$PWD" -w "$PWD" iron/go-dind go test -v $(go list ./... | grep -v /vendor/ | grep -v /examples/)