improvements

This commit is contained in:
Pedro Nasser
2016-09-13 23:22:00 -03:00
parent b21029c3e3
commit da1746dc97
2 changed files with 95 additions and 30 deletions

View File

@@ -1,6 +1,8 @@
package runner
import (
"time"
"github.com/Sirupsen/logrus"
titancommon "github.com/iron-io/titan/common"
"golang.org/x/net/context"
@@ -8,6 +10,9 @@ import (
type Logger interface {
Log(context.Context, map[string]interface{})
LogCount(context.Context, string, int)
LogGauge(context.Context, string, int)
LogTime(context.Context, string, time.Duration)
}
type Metric map[string]interface{}
@@ -22,3 +27,27 @@ func (l *MetricLogger) Log(ctx context.Context, metric map[string]interface{}) {
log := titancommon.Logger(ctx)
log.WithFields(logrus.Fields(metric)).Info()
}
func (l *MetricLogger) LogCount(ctx context.Context, name string, value int) {
l.Log(ctx, Metric{
"name": name,
"value": value,
"type": "count",
})
}
func (l *MetricLogger) LogTime(ctx context.Context, name string, value time.Duration) {
l.Log(ctx, Metric{
"name": name,
"value": value,
"type": "time",
})
}
func (l *MetricLogger) LogGauge(ctx context.Context, name string, value int) {
l.Log(ctx, Metric{
"name": name,
"value": value,
"type": "gauge",
})
}

View File

@@ -9,6 +9,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/net/context"
@@ -34,9 +35,12 @@ type Config struct {
}
type Runner struct {
driver drivers.Driver
taskQueue chan *containerTask
ml Logger
driver drivers.Driver
taskQueue chan *containerTask
ml Logger
availableMem int64
usedMem int64
usedMemMutex sync.RWMutex
}
var (
@@ -58,9 +62,11 @@ func New(metricLogger Logger) (*Runner, error) {
}
r := &Runner{
driver: driver,
taskQueue: make(chan *containerTask, 100),
ml: metricLogger,
driver: driver,
taskQueue: make(chan *containerTask, 100),
ml: metricLogger,
availableMem: getAvailableMemory(),
usedMem: 0,
}
go r.queueHandler()
@@ -85,21 +91,22 @@ func (r *Runner) queueHandler() {
}
// Loop waiting for available memory
avail := dynamicSizing(task.cfg.Memory)
for ; avail == 0; avail = dynamicSizing(task.cfg.Memory) {
canRun := r.checkRequiredMem(task.cfg.Memory)
for ; !canRun; canRun = r.checkRequiredMem(task.cfg.Memory) {
waitTime = time.Since(waitStart)
if waitTime > WaitMemoryTimeout {
timedOut = true
break
}
time.Sleep(time.Second)
}
metricBaseName := fmt.Sprintf("run.%s.", task.cfg.AppName)
r.ml.Log(task.ctx, Metric{"name": (metricBaseName + "waittime"), "type": "time", "value": waitTime})
r.ml.LogTime(task.ctx, metricBaseName+"waittime", waitTime)
if timedOut {
// Send to a signal to this task saying it cannot run
r.ml.Log(task.ctx, Metric{"name": (metricBaseName + "timeout"), "type": "count", "value": 1})
r.ml.LogCount(task.ctx, metricBaseName+"timeout", 1)
task.canRun <- false
continue
}
@@ -109,6 +116,36 @@ func (r *Runner) queueHandler() {
}
}
func (r *Runner) checkRequiredMem(req uint64) bool {
r.usedMemMutex.RLock()
defer r.usedMemMutex.RUnlock()
return r.availableMem-r.usedMem/int64(req)*1024*1024 > 0
}
func (r *Runner) addUsedMem(used int64) {
r.usedMemMutex.Lock()
r.usedMem = r.usedMem + used*1024*1024
if r.usedMem < 0 {
r.usedMem = 0
}
r.usedMemMutex.Unlock()
}
func (r *Runner) checkMemAndUse(req uint64) bool {
r.usedMemMutex.Lock()
defer r.usedMemMutex.Unlock()
used := int64(req) * 1024 * 1024
if r.availableMem-r.usedMem/used < 0 {
return false
}
r.usedMem += used
return true
}
func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error) {
var err error
@@ -120,22 +157,17 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
}
metricBaseName := fmt.Sprintf("run.%s.", cfg.AppName)
r.ml.Log(ctx, Metric{"name": (metricBaseName + "requests"), "type": "count", "value": 1})
closer, err := r.driver.Prepare(ctx, ctask)
if err != nil {
return nil, err
}
defer closer.Close()
r.ml.LogCount(ctx, metricBaseName+"requests", 1)
// Check if has enough available memory
if dynamicSizing(cfg.Memory) == 0 {
// If available, use it
if !r.checkMemAndUse(cfg.Memory) {
// If not, try add task to the queue
select {
case r.taskQueue <- ctask:
default:
// If queue is full, return error
r.ml.Log(ctx, Metric{"name": "queue.full", "type": "count", "value": 1})
r.ml.LogCount(ctx, "queue.full", 1)
return nil, ErrFullQueue
}
@@ -145,23 +177,32 @@ func (r *Runner) Run(ctx context.Context, cfg *Config) (drivers.RunResult, error
return nil, ErrTimeOutNoMemory
}
} else {
r.ml.Log(ctx, Metric{"name": (metricBaseName + "waittime"), "type": "time", "value": 0})
r.ml.LogTime(ctx, metricBaseName+"waittime", 0)
}
closer, err := r.driver.Prepare(ctx, ctask)
if err != nil {
return nil, err
}
defer closer.Close()
metricStart := time.Now()
result, err := r.driver.Run(ctx, ctask)
if err != nil {
return nil, err
}
r.addUsedMem(-1 * int64(cfg.Memory))
if result.Status() == "success" {
r.ml.Log(ctx, Metric{"name": (metricBaseName + "succeeded"), "type": "count", "value": 1})
r.ml.LogCount(ctx, metricBaseName+"succeeded", 1)
} else {
r.ml.Log(ctx, Metric{"name": (metricBaseName + "error"), "type": "count", "value": 1})
r.ml.LogCount(ctx, metricBaseName+"error", 1)
}
metricElapsed := time.Since(metricStart)
r.ml.Log(ctx, Metric{"name": (metricBaseName + "time"), "type": "time", "value": metricElapsed})
r.ml.LogTime(ctx, metricBaseName+"time", metricElapsed)
return result, nil
}
@@ -190,11 +231,8 @@ func selectDriver(driver string, env *common.Environment, conf *driverscommon.Co
return nil, fmt.Errorf("driver %v not found", driver)
}
func dynamicSizing(reqMem uint64) int {
func getAvailableMemory() int64 {
const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB
if reqMem == 0 {
reqMem = 128
}
var availableMemory uint64
if os.Getenv("IGNORE_MEMORY") == "1" {
@@ -213,9 +251,7 @@ func dynamicSizing(reqMem uint64) int {
}
}
c := availableMemory / (reqMem * 1024 * 1024)
return int(c)
return int64(availableMemory)
}
func checkCgroup() (uint64, error) {