diff --git a/api/agent/agent.go b/api/agent/agent.go index 3e82c8546..a7b4a0f20 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -338,7 +338,7 @@ func (a *agent) hotLauncher(ctx context.Context, callObj *call) { }).Info("Hot function launcher starting hot container") select { - case tok, isOpen := <-a.resources.GetResourceToken(resourceCtx, callObj.Memory, isAsync): + case tok, isOpen := <-a.resources.GetResourceToken(resourceCtx, callObj.Memory, uint64(callObj.CPUs), isAsync): cancel() if isOpen { a.wg.Add(1) @@ -404,7 +404,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { ch := make(chan Slot) select { - case tok, isOpen := <-a.resources.GetResourceToken(ctx, call.Memory, isAsync): + case tok, isOpen := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): if !isOpen { return nil, models.ErrCallTimeoutServerBusy } @@ -544,6 +544,7 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch image: call.Image, env: map[string]string(call.Config), memory: call.Memory, + cpus: uint64(call.CPUs), timeout: time.Duration(call.Timeout) * time.Second, // this is unnecessary, but in case removal fails... stdin: call.req.Body, stdout: call.w, @@ -604,12 +605,13 @@ func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { image: call.Image, env: map[string]string(call.Config), memory: call.Memory, + cpus: uint64(call.CPUs), stdin: stdinRead, stdout: stdoutWrite, stderr: &ghostWriter{inner: stderr}, } - logger := logrus.WithFields(logrus.Fields{"id": container.id, "app": call.AppName, "route": call.Path, "image": call.Image, "memory": call.Memory, "format": call.Format, "idle_timeout": call.IdleTimeout}) + logger := logrus.WithFields(logrus.Fields{"id": container.id, "app": call.AppName, "route": call.Path, "image": call.Image, "memory": call.Memory, "cpus": call.CPUs, "format": call.Format, "idle_timeout": call.IdleTimeout}) ctx = common.WithLogger(ctx, logger) cookie, err := a.driver.Prepare(ctx, container) @@ -698,6 +700,7 @@ type container struct { image string env map[string]string memory uint64 + cpus uint64 timeout time.Duration // cold only (superfluous, but in case) stdin io.Reader @@ -735,6 +738,7 @@ func (c *container) Image() string { return c.image } func (c *container) Timeout() time.Duration { return c.timeout } func (c *container) EnvVars() map[string]string { return c.env } func (c *container) Memory() uint64 { return c.memory * 1024 * 1024 } // convert MB +func (c *container) CPUs() uint64 { return c.cpus } // WriteStat publishes each metric in the specified Stats structure as a histogram metric func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) { diff --git a/api/agent/agent_test.go b/api/agent/agent_test.go index 7f05c629c..7c5e6109e 100644 --- a/api/agent/agent_test.go +++ b/api/agent/agent_test.go @@ -185,6 +185,7 @@ func TestCallConfigurationModel(t *testing.T) { const timeout = 1 const idleTimeout = 20 const memory = 256 + CPUs := models.MilliCPUs(1000) method := "GET" url := "http://127.0.0.1:8080/r/" + appName + path payload := "payload" @@ -195,6 +196,7 @@ func TestCallConfigurationModel(t *testing.T) { "FN_APP_NAME": appName, "FN_PATH": path, "FN_MEMORY": strconv.Itoa(memory), + "FN_CPUS": CPUs.String(), "FN_TYPE": typ, "APP_VAR": "FOO", "ROUTE_VAR": "BAR", @@ -210,6 +212,7 @@ func TestCallConfigurationModel(t *testing.T) { Timeout: timeout, IdleTimeout: idleTimeout, Memory: memory, + CPUs: CPUs, Payload: payload, URL: url, Method: method, @@ -243,6 +246,7 @@ func TestAsyncCallHeaders(t *testing.T) { const timeout = 1 const idleTimeout = 20 const memory = 256 + CPUs := models.MilliCPUs(200) method := "GET" url := "http://127.0.0.1:8080/r/" + appName + path payload := "payload" @@ -255,6 +259,7 @@ func TestAsyncCallHeaders(t *testing.T) { "FN_APP_NAME": appName, "FN_PATH": path, "FN_MEMORY": strconv.Itoa(memory), + "FN_CPUS": CPUs.String(), "FN_TYPE": typ, "APP_VAR": "FOO", "ROUTE_VAR": "BAR", @@ -277,6 +282,7 @@ func TestAsyncCallHeaders(t *testing.T) { Timeout: timeout, IdleTimeout: idleTimeout, Memory: memory, + CPUs: CPUs, Payload: payload, URL: url, Method: method, @@ -343,6 +349,7 @@ func TestSubmitError(t *testing.T) { const timeout = 10 const idleTimeout = 20 const memory = 256 + CPUs := models.MilliCPUs(200) method := "GET" url := "http://127.0.0.1:8080/r/" + appName + path payload := "payload" @@ -353,6 +360,7 @@ func TestSubmitError(t *testing.T) { "FN_APP_NAME": appName, "FN_PATH": path, "FN_MEMORY": strconv.Itoa(memory), + "FN_CPUS": CPUs.String(), "FN_TYPE": typ, "APP_VAR": "FOO", "ROUTE_VAR": "BAR", @@ -369,6 +377,7 @@ func TestSubmitError(t *testing.T) { Timeout: timeout, IdleTimeout: idleTimeout, Memory: memory, + CPUs: CPUs, Payload: payload, URL: url, Method: method, diff --git a/api/agent/call.go b/api/agent/call.go index 0db11d1a7..b8a2b40f2 100644 --- a/api/agent/call.go +++ b/api/agent/call.go @@ -103,6 +103,7 @@ func FromRequest(appName, path string, req *http.Request) CallOpt { Timeout: route.Timeout, IdleTimeout: route.IdleTimeout, Memory: route.Memory, + CPUs: route.CPUs, Config: buildConfig(app, route), Headers: req.Header, CreatedAt: strfmt.DateTime(time.Now()), @@ -130,6 +131,11 @@ func buildConfig(app *models.App, route *models.Route) models.Config { // TODO: might be a good idea to pass in: "FN_BASE_PATH" = fmt.Sprintf("/r/%s", appName) || "/" if using DNS entries per app conf["FN_MEMORY"] = fmt.Sprintf("%d", route.Memory) conf["FN_TYPE"] = route.Type + + CPUs := route.CPUs.String() + if CPUs != "" { + conf["FN_CPUS"] = CPUs + } return conf } diff --git a/api/agent/drivers/docker/docker.go b/api/agent/drivers/docker/docker.go index e660ffe49..d845dd005 100644 --- a/api/agent/drivers/docker/docker.go +++ b/api/agent/drivers/docker/docker.go @@ -115,6 +115,15 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask Context: ctx, } + // Translate milli cpus into CPUQuota & CPUPeriod (see Linux cGroups CFS cgroup v1 documentation) + // eg: task.CPUQuota() of 8000 means CPUQuota of 8 * 100000 usecs in 100000 usec period, + // which is approx 8 CPUS in CFS world. + // Also see docker run options --cpu-quota and --cpu-period + if task.CPUs() != 0 { + container.HostConfig.CPUQuota = int64(task.CPUs() * 100) + container.HostConfig.CPUPeriod = 100000 + } + volumes := task.Volumes() for _, mapping := range volumes { hostDir := mapping[0] @@ -140,7 +149,7 @@ func (drv *DockerDriver) Prepare(ctx context.Context, task drivers.ContainerTask // since we retry under the hood, if the container gets created and retry fails, we can just ignore error if err != docker.ErrContainerAlreadyExists { log.WithFields(logrus.Fields{"call_id": task.Id(), "command": container.Config.Cmd, "memory": container.Config.Memory, - "cpu_shares": container.Config.CPUShares, "hostname": container.Config.Hostname, "name": container.Name, + "cpu_shares": container.Config.CPUShares, "cpu_quota": task.CPUs(), "hostname": container.Config.Hostname, "name": container.Name, "image": container.Config.Image, "volumes": container.Config.Volumes, "binds": container.HostConfig.Binds, "container": container.Name, }).WithError(err).Error("Could not create container") diff --git a/api/agent/drivers/docker/docker_test.go b/api/agent/drivers/docker/docker_test.go index 10b083db5..b7a734078 100644 --- a/api/agent/drivers/docker/docker_test.go +++ b/api/agent/drivers/docker/docker_test.go @@ -31,6 +31,7 @@ func (f *taskDockerTest) Logger() (stdout, stderr io.Writer) { return f.out func (f *taskDockerTest) WriteStat(context.Context, drivers.Stat) { /* TODO */ } func (f *taskDockerTest) Volumes() [][2]string { return [][2]string{} } func (f *taskDockerTest) Memory() uint64 { return 256 * 1024 * 1024 } +func (f *taskDockerTest) CPUs() uint64 { return 0 } func (f *taskDockerTest) WorkDir() string { return "" } func (f *taskDockerTest) Close() {} func (f *taskDockerTest) Input() io.Reader { return f.input } diff --git a/api/agent/drivers/driver.go b/api/agent/drivers/driver.go index f711eaa23..3248bfdf8 100644 --- a/api/agent/drivers/driver.go +++ b/api/agent/drivers/driver.go @@ -104,6 +104,9 @@ type ContainerTask interface { // 0 is unlimited. Memory() uint64 + // CPUs in milli CPU units + CPUs() uint64 + // WorkDir returns the working directory to use for the task. Empty string // leaves it unset. WorkDir() string diff --git a/api/agent/resource.go b/api/agent/resource.go index f1225a657..8e2498ee5 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -26,7 +26,7 @@ const ( type ResourceTracker interface { WaitAsyncResource() chan struct{} // returns a closed channel if the resource can never me met. - GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken + GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken } type resourceTracker struct { @@ -42,6 +42,17 @@ type resourceTracker struct { ramAsyncUsed uint64 // memory in use for async area in which agent stops dequeuing async jobs ramAsyncHWMark uint64 + + // cpuTotal is the total usable cpu for sync functions + cpuSyncTotal uint64 + // cpuSyncUsed is cpu reserved for running sync containers including hot/idle + cpuSyncUsed uint64 + // cpuAsyncTotal is the total usable cpu for async + sync functions + cpuAsyncTotal uint64 + // cpuAsyncUsed is cpu reserved for running async + sync containers including hot/idle + cpuAsyncUsed uint64 + // cpu in use for async area in which agent stops dequeuing async jobs + cpuAsyncHWMark uint64 } func NewResourceTracker() ResourceTracker { @@ -51,6 +62,7 @@ func NewResourceTracker() ResourceTracker { } obj.initializeMemory() + obj.initializeCPU() return obj } @@ -60,46 +72,52 @@ type ResourceToken interface { } type resourceToken struct { + once sync.Once decrement func() } func (t *resourceToken) Close() error { - t.decrement() + t.once.Do(func() { + t.decrement() + }) return nil } -func (a *resourceTracker) isResourceAvailableLocked(memory uint64, isAsync bool) bool { +func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota uint64, isAsync bool) bool { - asyncAvail := a.ramAsyncTotal - a.ramAsyncUsed - syncAvail := a.ramSyncTotal - a.ramSyncUsed + asyncAvailMem := a.ramAsyncTotal - a.ramAsyncUsed + syncAvailMem := a.ramSyncTotal - a.ramSyncUsed + + asyncAvailCPU := a.cpuAsyncTotal - a.cpuAsyncUsed + syncAvailCPU := a.cpuSyncTotal - a.cpuSyncUsed // For sync functions, we can steal from async pool. For async, we restrict it to sync pool if isAsync { - return asyncAvail >= memory + return asyncAvailMem >= memory && asyncAvailCPU >= cpuQuota } else { - return asyncAvail+syncAvail >= memory + return asyncAvailMem+syncAvailMem >= memory && asyncAvailCPU+syncAvailCPU >= cpuQuota } } // is this request possible to meet? If no, fail quick -func (a *resourceTracker) isResourcePossible(memory uint64, isAsync bool) bool { +func (a *resourceTracker) isResourcePossible(memory uint64, cpuQuota uint64, isAsync bool) bool { if isAsync { - return memory <= a.ramAsyncTotal + return memory <= a.ramAsyncTotal && cpuQuota <= a.cpuAsyncTotal } else { - return memory <= a.ramSyncTotal+a.ramAsyncTotal + return memory <= a.ramSyncTotal+a.ramAsyncTotal && cpuQuota <= a.cpuSyncTotal+a.cpuAsyncTotal } } // the received token should be passed directly to launch (unconditionally), launch // will close this token (i.e. the receiver should not call Close) -func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken { +func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { memory = memory * Mem1MB c := a.cond ch := make(chan ResourceToken) - if !a.isResourcePossible(memory, isAsync) { + if !a.isResourcePossible(memory, cpuQuota, isAsync) { close(ch) return ch } @@ -107,39 +125,44 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, i go func() { c.L.Lock() - for !a.isResourceAvailableLocked(memory, isAsync) { - select { - case <-ctx.Done(): - c.L.Unlock() - return - default: - } - + for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { c.Wait() } + if ctx.Err() != nil { + c.L.Unlock() + return + } + var asyncMem, syncMem uint64 + var asyncCPU, syncCPU uint64 if isAsync { // async uses async pool only asyncMem = memory - } else if a.ramSyncTotal-a.ramSyncUsed >= memory { - // if sync fits in sync pool - syncMem = memory + asyncCPU = cpuQuota } else { // if sync fits async + sync pool - syncMem = a.ramSyncTotal - a.ramSyncUsed + syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory) + syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, cpuQuota) + asyncMem = memory - syncMem + asyncCPU = cpuQuota - syncCPU } a.ramAsyncUsed += asyncMem a.ramSyncUsed += syncMem + a.cpuAsyncUsed += asyncCPU + a.cpuSyncUsed += syncCPU c.L.Unlock() t := &resourceToken{decrement: func() { + c.L.Lock() a.ramAsyncUsed -= asyncMem a.ramSyncUsed -= syncMem + a.cpuAsyncUsed -= asyncCPU + a.cpuSyncUsed -= syncCPU c.L.Unlock() // WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but @@ -159,7 +182,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, i return ch } -// WaitAsyncResource will send a signal on the returned channel when RAM in-use +// WaitAsyncResource will send a signal on the returned channel when RAM and CPU in-use // in the async area is less than high water mark func (a *resourceTracker) WaitAsyncResource() chan struct{} { ch := make(chan struct{}) @@ -167,7 +190,7 @@ func (a *resourceTracker) WaitAsyncResource() chan struct{} { c := a.cond go func() { c.L.Lock() - for a.ramAsyncUsed >= a.ramAsyncHWMark { + for a.ramAsyncUsed >= a.ramAsyncHWMark || a.cpuAsyncUsed >= a.cpuAsyncHWMark { c.Wait() } c.L.Unlock() @@ -198,6 +221,71 @@ func clampUint64(val, min, max uint64) uint64 { return val } +func (a *resourceTracker) initializeCPU() { + + var maxSyncCPU, maxAsyncCPU, cpuAsyncHWMark uint64 + var totalCPU, availCPU uint64 + + if runtime.GOOS == "linux" { + + // Why do we prefer /proc/cpuinfo for Linux and not just use runtime.NumCPU? + // This is because NumCPU is sched_getaffinity based and we prefer to check + // cgroup which will more likely be same cgroup for container runtime + numCPU, err := checkProcCPU() + if err != nil { + logrus.WithError(err).Error("Error checking for CPU, falling back to runtime CPU count.") + numCPU = uint64(runtime.NumCPU()) + } + + totalCPU = 1000 * numCPU + availCPU = totalCPU + + // Clamp further if cgroups CFS quota/period limits are in place + cgroupCPU := checkCgroupCPU() + if cgroupCPU > 0 { + availCPU = minUint64(availCPU, cgroupCPU) + } + + // TODO: check cgroup cpuset to clamp this further. We might be restricted into + // a subset of CPUs. (eg. /sys/fs/cgroup/cpuset/cpuset.effective_cpus) + + // TODO: skip CPU headroom for ourselves for now + } else { + totalCPU = uint64(runtime.NumCPU() * 1000) + availCPU = totalCPU + } + + logrus.WithFields(logrus.Fields{ + "totalCPU": totalCPU, + "availCPU": availCPU, + }).Info("available cpu") + + // %20 of cpu for sync only reserve + maxSyncCPU = uint64(availCPU * 2 / 10) + maxAsyncCPU = availCPU - maxSyncCPU + cpuAsyncHWMark = maxAsyncCPU * 8 / 10 + + logrus.WithFields(logrus.Fields{ + "cpuSync": maxSyncCPU, + "cpuAsync": maxAsyncCPU, + "cpuAsyncHWMark": cpuAsyncHWMark, + }).Info("sync and async cpu reservations") + + if maxSyncCPU == 0 || maxAsyncCPU == 0 { + logrus.Fatal("Cannot get the proper CPU information to size server") + } + + if maxSyncCPU+maxAsyncCPU < 1000 { + logrus.Warn("Severaly Limited CPU: cpuSync + cpuAsync < 1000m (1 CPU)") + } else if maxAsyncCPU < 1000 { + logrus.Warn("Severaly Limited CPU: cpuAsync < 1000m (1 CPU)") + } + + a.cpuAsyncHWMark = cpuAsyncHWMark + a.cpuSyncTotal = maxSyncCPU + a.cpuAsyncTotal = maxAsyncCPU +} + func (a *resourceTracker) initializeMemory() { var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64 @@ -205,7 +293,7 @@ func (a *resourceTracker) initializeMemory() { if runtime.GOOS == "linux" { // system wide available memory - totalMemory, err := checkProc() + totalMemory, err := checkProcMem() if err != nil { logrus.WithError(err).Fatal("Cannot get the proper memory information to size server.") } @@ -213,7 +301,7 @@ func (a *resourceTracker) initializeMemory() { availMemory := totalMemory // cgroup limit restriction on memory usage - cGroupLimit, err := checkCgroup() + cGroupLimit, err := checkCgroupMem() if err != nil { logrus.WithError(err).Error("Error checking for cgroup memory limits, falling back to host memory available..") } else { @@ -251,7 +339,7 @@ func (a *resourceTracker) initializeMemory() { "ramSync": maxSyncMemory, "ramAsync": maxAsyncMemory, "ramAsyncHWMark": ramAsyncHWMark, - }).Info("sync and async reservations") + }).Info("sync and async ram reservations") if maxSyncMemory == 0 || maxAsyncMemory == 0 { logrus.Fatal("Cannot get the proper memory pool information to size server") @@ -286,24 +374,56 @@ func getMemoryHeadRoom(usableMemory uint64) (uint64, error) { return headRoom, nil } -func checkCgroup() (uint64, error) { - f, err := os.Open("/sys/fs/cgroup/memory/memory.limit_in_bytes") +func readString(fileName string) (string, error) { + b, err := ioutil.ReadFile(fileName) + if err != nil { + return "", err + } + value := string(b) + return strings.TrimSpace(value), nil +} + +func checkCgroupMem() (uint64, error) { + value, err := readString("/sys/fs/cgroup/memory/memory.limit_in_bytes") if err != nil { return 0, err } - defer f.Close() - b, err := ioutil.ReadAll(f) + return strconv.ParseUint(value, 10, 64) +} + +func checkCgroupCPU() uint64 { + + periodStr, err := readString("/sys/fs/cgroup/cpu/cpu.cfs_period_us") if err != nil { - return 0, err + return 0 } - limBytes := string(b) - limBytes = strings.TrimSpace(limBytes) - return strconv.ParseUint(limBytes, 10, 64) + quotaStr, err := readString("/sys/fs/cgroup/cpu/cpu.cfs_quota_us") + if err != nil { + return 0 + } + + period, err := strconv.ParseUint(periodStr, 10, 64) + if err != nil { + logrus.Warn("Cannot parse CFS period", err) + return 0 + } + + quota, err := strconv.ParseInt(quotaStr, 10, 64) + if err != nil { + logrus.Warn("Cannot parse CFS quota", err) + return 0 + } + + if quota <= 0 || period <= 0 { + return 0 + } + + return uint64(quota) * 1000 / period } var errCantReadMemInfo = errors.New("Didn't find MemAvailable in /proc/meminfo, kernel is probably < 3.14") -func checkProc() (uint64, error) { +func checkProcMem() (uint64, error) { f, err := os.Open("/proc/meminfo") if err != nil { return 0, err @@ -341,3 +461,30 @@ func checkProc() (uint64, error) { return 0, errCantReadMemInfo } + +func checkProcCPU() (uint64, error) { + f, err := os.Open("/proc/cpuinfo") + if err != nil { + return 0, err + } + defer f.Close() + + total := uint64(0) + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + b := scanner.Text() + + // processor : 0 + toks := strings.Fields(b) + if len(toks) == 3 && toks[0] == "processor" && toks[1] == ":" { + total += 1 + } + } + + if total == 0 { + return 0, errors.New("Could not parse cpuinfo") + } + + return total, nil +} diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go index 2087a9062..3044fc492 100644 --- a/api/agent/resource_test.go +++ b/api/agent/resource_test.go @@ -1,6 +1,8 @@ package agent import ( + "context" + "errors" "testing" "time" ) @@ -8,11 +10,17 @@ import ( func setTrackerTestVals(tr *resourceTracker, vals *trackerVals) { tr.cond.L.Lock() - tr.ramSyncTotal = vals.st - tr.ramSyncUsed = vals.su - tr.ramAsyncTotal = vals.at - tr.ramAsyncUsed = vals.au - tr.ramAsyncHWMark = vals.am + tr.ramSyncTotal = vals.mst + tr.ramSyncUsed = vals.msu + tr.ramAsyncTotal = vals.mat + tr.ramAsyncUsed = vals.mau + tr.ramAsyncHWMark = vals.mam + + tr.cpuSyncTotal = vals.cst + tr.cpuSyncUsed = vals.csu + tr.cpuAsyncTotal = vals.cat + tr.cpuAsyncUsed = vals.cau + tr.cpuAsyncHWMark = vals.cam tr.cond.L.Unlock() tr.cond.Broadcast() @@ -22,24 +30,72 @@ func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) { tr.cond.L.Lock() - vals.st = tr.ramSyncTotal - vals.su = tr.ramSyncUsed - vals.at = tr.ramAsyncTotal - vals.au = tr.ramAsyncUsed - vals.am = tr.ramAsyncHWMark + vals.mst = tr.ramSyncTotal + vals.msu = tr.ramSyncUsed + vals.mat = tr.ramAsyncTotal + vals.mau = tr.ramAsyncUsed + vals.mam = tr.ramAsyncHWMark + + vals.cst = tr.cpuSyncTotal + vals.csu = tr.cpuSyncUsed + vals.cat = tr.cpuAsyncTotal + vals.cau = tr.cpuAsyncUsed + vals.cam = tr.cpuAsyncHWMark tr.cond.L.Unlock() } +// helper to debug print (fields correspond to resourceTracker CPU/MEM fields) type trackerVals struct { - st uint64 - su uint64 - at uint64 - au uint64 - am uint64 + mst uint64 + msu uint64 + mat uint64 + mau uint64 + mam uint64 + cst uint64 + csu uint64 + cat uint64 + cau uint64 + cam uint64 } -func TestResourceAsyncMem(t *testing.T) { +func (vals *trackerVals) setDefaults() { + // set set these to known vals (4GB total: 1GB sync, 3 async) + vals.mst = 1 * Mem1GB + vals.msu = 0 + vals.mat = 3 * Mem1GB + vals.mau = 0 + vals.mam = 1 * Mem1GB + + // let's assume 10 CPUs (2 CPU sync, 8 CPU async) + vals.cst = 2000 + vals.csu = 0 + vals.cat = 8000 + vals.cau = 0 + vals.cam = 6000 +} + +func fetchToken(ch <-chan ResourceToken) (ResourceToken, error) { + select { + case tok := <-ch: + return tok, nil + case <-time.After(time.Duration(500) * time.Millisecond): + return nil, errors.New("expected token") + } +} + +func isClosed(ch <-chan ResourceToken) bool { + select { + case _, ok := <-ch: + if !ok { + return true + } + default: + } + return false +} + +func TestResourceAsyncWait(t *testing.T) { var vals trackerVals @@ -48,36 +104,242 @@ func TestResourceAsyncMem(t *testing.T) { tr := trI.(*resourceTracker) getTrackerTestVals(tr, &vals) - if vals.st <= 0 || vals.su != 0 || vals.at <= 0 || vals.au != 0 || vals.am <= 0 { - t.Fatalf("faulty init %#v", vals) + if vals.mst <= 0 || vals.msu != 0 || vals.mat <= 0 || vals.mau != 0 || vals.mam <= 0 { + t.Fatalf("faulty init MEM %#v", vals) + } + if vals.cst <= 0 || vals.csu != 0 || vals.cat <= 0 || vals.cau != 0 || vals.cam <= 0 { + t.Fatalf("faulty init CPU %#v", vals) } - // set set these to known vals - vals.st = 1 * 1024 * 1024 - vals.su = 0 - vals.at = 2 * 1024 * 1024 - vals.au = 0 - vals.am = 1 * 1024 * 1024 + vals.setDefaults() // should block & wait - vals.au = vals.am + vals.mau = vals.mam setTrackerTestVals(tr, &vals) ch := tr.WaitAsyncResource() select { case <-ch: - t.Fatal("high water mark over, should not trigger") + t.Fatal("high water mark MEM over, should not trigger") case <-time.After(time.Duration(500) * time.Millisecond): } // should not block & wait - vals.au = 0 + vals.mau = 0 setTrackerTestVals(tr, &vals) select { case <-ch: case <-time.After(time.Duration(500) * time.Millisecond): - t.Fatal("high water mark not over, should trigger") + t.Fatal("high water mark MEM not over, should trigger") + } + + // get a new channel to prevent previous test interference + ch = tr.WaitAsyncResource() + + // should block & wait + vals.cau = vals.cam + setTrackerTestVals(tr, &vals) + + select { + case <-ch: + t.Fatal("high water mark CPU over, should not trigger") + case <-time.After(time.Duration(500) * time.Millisecond): + } + + // should not block & wait + vals.cau = 0 + setTrackerTestVals(tr, &vals) + + select { + case <-ch: + case <-time.After(time.Duration(500) * time.Millisecond): + t.Fatal("high water mark CPU not over, should trigger") + } + +} + +func TestResourceGetSimple(t *testing.T) { + + var vals trackerVals + trI := NewResourceTracker() + tr := trI.(*resourceTracker) + + vals.setDefaults() + + // let's make it like CPU and MEM are 100% full + vals.mau = vals.mat + vals.cau = vals.cat + + setTrackerTestVals(tr, &vals) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // ask for 4GB and 10 CPU + ch := trI.GetResourceToken(ctx, 4*1024, 1000, false) + + _, err := fetchToken(ch) + if err == nil { + t.Fatalf("full system should not hand out token") + } + + // reset back + vals.setDefaults() + setTrackerTestVals(tr, &vals) + + tok, err := fetchToken(ch) + if err != nil { + t.Fatalf("empty system should hand out token") + } + + // ask for another 4GB and 10 CPU + ch = trI.GetResourceToken(ctx, 4*1024, 1000, false) + + _, err = fetchToken(ch) + if err == nil { + t.Fatalf("full system should not hand out token") + } + + // close means, giant token resources released + tok.Close() + + tok, err = fetchToken(ch) + if err != nil { + t.Fatalf("empty system should hand out token") + } + + tok.Close() + + // POOLS should all be empty now + getTrackerTestVals(tr, &vals) + if vals.msu != 0 || vals.mau != 0 { + t.Fatalf("faulty state MEM %#v", vals) + } + if vals.csu != 0 || vals.cau != 0 { + t.Fatalf("faulty state CPU %#v", vals) + } +} + +func TestResourceGetCombo(t *testing.T) { + + var vals trackerVals + trI := NewResourceTracker() + tr := trI.(*resourceTracker) + + vals.setDefaults() + setTrackerTestVals(tr, &vals) + + ctx, cancel := context.WithCancel(context.Background()) + + // impossible request + ch := trI.GetResourceToken(ctx, 20*1024, 20000, false) + if !isClosed(ch) { + t.Fatalf("impossible request should return closed channel") + } + + cancel() + ctx, cancel = context.WithCancel(context.Background()) + + // let's use up 2 GB of 3GB async pool + ch = trI.GetResourceToken(ctx, 2*1024, 10, true) + tok1, err := fetchToken(ch) + if err != nil { + t.Fatalf("empty async system should hand out token1") + } + + cancel() + ctx, cancel = context.WithCancel(context.Background()) + + // remaining 1 GB async + ch = trI.GetResourceToken(ctx, 1*1024, 11, true) + tok2, err := fetchToken(ch) + if err != nil { + t.Fatalf("empty async system should hand out token2") + } + + cancel() + ctx, cancel = context.WithCancel(context.Background()) + + // NOW ASYNC POOL IS FULL + // SYNC POOL HAS 1GB + + // we no longer can get async token + ch = trI.GetResourceToken(ctx, 1*1024, 12, true) + _, err = fetchToken(ch) + if err == nil { + t.Fatalf("full async system should not hand out a token") + } + + cancel() + ctx, cancel = context.WithCancel(context.Background()) + + // but we should get 1GB sync token + ch = trI.GetResourceToken(ctx, 1*1024, 13, false) + tok3, err := fetchToken(ch) + if err != nil { + t.Fatalf("empty sync system should hand out token3") + } + cancel() + ctx, cancel = context.WithCancel(context.Background()) + + // NOW ASYNC AND SYNC POOLS ARE FULL + + // this should fail + ch = trI.GetResourceToken(ctx, 1*1024, 14, false) + _, err = fetchToken(ch) + if err == nil { + t.Fatalf("full system should not hand out a token") + } + cancel() + ctx, cancel = context.WithCancel(context.Background()) + + // now let's free up some async pool, release tok2 (1GB) + tok2.Close() + + // NOW ASYNC POOL HAS 1GB FREE + // SYNC POOL IS FULL + + // async pool should provide this + ch = trI.GetResourceToken(ctx, 1*1024, 15, false) + tok4, err := fetchToken(ch) + if err != nil { + t.Fatalf("async system should hand out token4") + } + cancel() + ctx, cancel = context.WithCancel(context.Background()) + + // NOW ASYNC AND SYNC POOLS ARE FULL + + tok4.Close() + tok3.Close() + + // NOW ASYNC POOL HAS 1GB FREE + // SYNC POOL HAS 1GB FREE + + // now, we ask for 2GB sync token, it should be provided from both async+sync pools + ch = trI.GetResourceToken(ctx, 2*1024, 16, false) + tok5, err := fetchToken(ch) + if err != nil { + t.Fatalf("async+sync system should hand out token5") + } + cancel() + + // NOW ASYNC AND SYNC POOLS ARE FULL + + tok1.Close() + tok5.Close() + + // attempt to close tok2 twice.. This should be OK. + tok2.Close() + + // POOLS should all be empty now + getTrackerTestVals(tr, &vals) + if vals.msu != 0 || vals.mau != 0 { + t.Fatalf("faulty state MEM %#v", vals) + } + if vals.csu != 0 || vals.cau != 0 { + t.Fatalf("faulty state CPU %#v", vals) } } diff --git a/api/agent/slots.go b/api/agent/slots.go index 38d96ba44..0eff17da3 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -334,6 +334,7 @@ func getSlotQueueKey(call *call) string { fmt.Fprint(hash, call.Timeout, "\x00") fmt.Fprint(hash, call.IdleTimeout, "\x00") fmt.Fprint(hash, call.Memory, "\x00") + fmt.Fprint(hash, call.CPUs, "\x00") fmt.Fprint(hash, call.Format, "\x00") // we have to sort these before printing, yay. TODO do better diff --git a/api/datastore/internal/datastoretest/test.go b/api/datastore/internal/datastoretest/test.go index efaca1b44..94b2ff685 100644 --- a/api/datastore/internal/datastoretest/test.go +++ b/api/datastore/internal/datastoretest/test.go @@ -529,6 +529,7 @@ func Test(t *testing.T, dsf func(t *testing.T) models.Datastore) { Format: "http", IdleTimeout: testRoute.IdleTimeout, Memory: testRoute.Memory, + CPUs: testRoute.CPUs, // updated Timeout: 2, Config: map[string]string{ @@ -572,6 +573,7 @@ func Test(t *testing.T, dsf func(t *testing.T) models.Datastore) { Format: "http", Timeout: 2, Memory: testRoute.Memory, + CPUs: testRoute.CPUs, IdleTimeout: testRoute.IdleTimeout, // updated Config: map[string]string{ diff --git a/api/datastore/sql/migrations/7_add_route_cpus.down.sql b/api/datastore/sql/migrations/7_add_route_cpus.down.sql new file mode 100644 index 000000000..a4fcefbac --- /dev/null +++ b/api/datastore/sql/migrations/7_add_route_cpus.down.sql @@ -0,0 +1 @@ +ALTER TABLE routes DROP COLUMN cpus; diff --git a/api/datastore/sql/migrations/7_add_route_cpus.up.sql b/api/datastore/sql/migrations/7_add_route_cpus.up.sql new file mode 100644 index 000000000..177446f97 --- /dev/null +++ b/api/datastore/sql/migrations/7_add_route_cpus.up.sql @@ -0,0 +1 @@ +ALTER TABLE routes ADD cpus int; diff --git a/api/datastore/sql/migrations/README.md b/api/datastore/sql/migrations/README.md index 5fd33f2a5..c8d2563a7 100644 --- a/api/datastore/sql/migrations/README.md +++ b/api/datastore/sql/migrations/README.md @@ -22,7 +22,7 @@ README: $ go generate ``` -NOTE: You may need to `go get github.com/jteeuwen/go-bindata` before running `go +NOTE: You may need to `go get -u github.com/jteeuwen/go-bindata/...` before running `go generate` in order for it to work. After running `go generate`, the `migrations.go` file should be updated. Check diff --git a/api/datastore/sql/migrations/migrations.go b/api/datastore/sql/migrations/migrations.go index c7a253e6b..b9e002586 100644 --- a/api/datastore/sql/migrations/migrations.go +++ b/api/datastore/sql/migrations/migrations.go @@ -12,6 +12,8 @@ // 5_add_app_created_at.up.sql // 6_add_app_updated_at.down.sql // 6_add_app_updated_at.up.sql +// 7_add_route_cpus.down.sql +// 7_add_route_cpus.up.sql // DO NOT EDIT! package migrations @@ -94,7 +96,7 @@ func _1_add_route_created_atDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_add_route_created_at.down.sql", size: 43, mode: os.FileMode(420), modTime: time.Unix(1511259011, 0)} + info := bindataFileInfo{name: "1_add_route_created_at.down.sql", size: 43, mode: os.FileMode(420), modTime: time.Unix(1510963763, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -114,7 +116,7 @@ func _1_add_route_created_atUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_add_route_created_at.up.sql", size: 40, mode: os.FileMode(420), modTime: time.Unix(1511919777, 0)} + info := bindataFileInfo{name: "1_add_route_created_at.up.sql", size: 40, mode: os.FileMode(420), modTime: time.Unix(1510963763, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -134,7 +136,7 @@ func _2_add_call_statsDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "2_add_call_stats.down.sql", size: 37, mode: os.FileMode(420), modTime: time.Unix(1511259011, 0)} + info := bindataFileInfo{name: "2_add_call_stats.down.sql", size: 37, mode: os.FileMode(420), modTime: time.Unix(1511917353, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -154,7 +156,7 @@ func _2_add_call_statsUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "2_add_call_stats.up.sql", size: 34, mode: os.FileMode(420), modTime: time.Unix(1511259011, 0)} + info := bindataFileInfo{name: "2_add_call_stats.up.sql", size: 34, mode: os.FileMode(420), modTime: time.Unix(1511917353, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -174,7 +176,7 @@ func _3_add_call_errorDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "3_add_call_error.down.sql", size: 37, mode: os.FileMode(420), modTime: time.Unix(1511301534, 0)} + info := bindataFileInfo{name: "3_add_call_error.down.sql", size: 37, mode: os.FileMode(420), modTime: time.Unix(1511989827, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -194,7 +196,7 @@ func _3_add_call_errorUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "3_add_call_error.up.sql", size: 34, mode: os.FileMode(420), modTime: time.Unix(1511301534, 0)} + info := bindataFileInfo{name: "3_add_call_error.up.sql", size: 34, mode: os.FileMode(420), modTime: time.Unix(1511989827, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -214,7 +216,7 @@ func _4_add_route_updated_atDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "4_add_route_updated_at.down.sql", size: 43, mode: os.FileMode(420), modTime: time.Unix(1513728957, 0)} + info := bindataFileInfo{name: "4_add_route_updated_at.down.sql", size: 43, mode: os.FileMode(420), modTime: time.Unix(1514060619, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -234,7 +236,7 @@ func _4_add_route_updated_atUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "4_add_route_updated_at.up.sql", size: 48, mode: os.FileMode(420), modTime: time.Unix(1513730369, 0)} + info := bindataFileInfo{name: "4_add_route_updated_at.up.sql", size: 48, mode: os.FileMode(420), modTime: time.Unix(1514060619, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -254,7 +256,7 @@ func _5_add_app_created_atDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "5_add_app_created_at.down.sql", size: 41, mode: os.FileMode(420), modTime: time.Unix(1513730497, 0)} + info := bindataFileInfo{name: "5_add_app_created_at.down.sql", size: 41, mode: os.FileMode(420), modTime: time.Unix(1514060619, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -274,7 +276,7 @@ func _5_add_app_created_atUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "5_add_app_created_at.up.sql", size: 46, mode: os.FileMode(420), modTime: time.Unix(1513730527, 0)} + info := bindataFileInfo{name: "5_add_app_created_at.up.sql", size: 46, mode: os.FileMode(420), modTime: time.Unix(1514060619, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -294,7 +296,7 @@ func _6_add_app_updated_atDownSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "6_add_app_updated_at.down.sql", size: 41, mode: os.FileMode(420), modTime: time.Unix(1513733616, 0)} + info := bindataFileInfo{name: "6_add_app_updated_at.down.sql", size: 41, mode: os.FileMode(420), modTime: time.Unix(1514060619, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -314,7 +316,47 @@ func _6_add_app_updated_atUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "6_add_app_updated_at.up.sql", size: 46, mode: os.FileMode(420), modTime: time.Unix(1513733621, 0)} + info := bindataFileInfo{name: "6_add_app_updated_at.up.sql", size: 46, mode: os.FileMode(420), modTime: time.Unix(1514060619, 0)} + a := &asset{bytes: bytes, info: info} + return a, nil +} + +var __7_add_route_cpusDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\x28\xca\x2f\x2d\x49\x2d\x56\x70\x09\xf2\x0f\x50\x70\xf6\xf7\x09\xf5\xf5\x53\x48\x2e\x28\x2d\xb6\xe6\x02\x04\x00\x00\xff\xff\xec\x60\x24\xd0\x25\x00\x00\x00") + +func _7_add_route_cpusDownSqlBytes() ([]byte, error) { + return bindataRead( + __7_add_route_cpusDownSql, + "7_add_route_cpus.down.sql", + ) +} + +func _7_add_route_cpusDownSql() (*asset, error) { + bytes, err := _7_add_route_cpusDownSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "7_add_route_cpus.down.sql", size: 37, mode: os.FileMode(420), modTime: time.Unix(1515624756, 0)} + a := &asset{bytes: bytes, info: info} + return a, nil +} + +var __7_add_route_cpusUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\x28\xca\x2f\x2d\x49\x2d\x56\x70\x74\x71\x51\x48\x2e\x28\x2d\x56\xc8\xcc\x2b\xb1\xe6\x02\x04\x00\x00\xff\xff\xf1\x18\xf8\xa9\x21\x00\x00\x00") + +func _7_add_route_cpusUpSqlBytes() ([]byte, error) { + return bindataRead( + __7_add_route_cpusUpSql, + "7_add_route_cpus.up.sql", + ) +} + +func _7_add_route_cpusUpSql() (*asset, error) { + bytes, err := _7_add_route_cpusUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "7_add_route_cpus.up.sql", size: 33, mode: os.FileMode(420), modTime: time.Unix(1515628068, 0)} a := &asset{bytes: bytes, info: info} return a, nil } @@ -383,6 +425,8 @@ var _bindata = map[string]func() (*asset, error){ "5_add_app_created_at.up.sql": _5_add_app_created_atUpSql, "6_add_app_updated_at.down.sql": _6_add_app_updated_atDownSql, "6_add_app_updated_at.up.sql": _6_add_app_updated_atUpSql, + "7_add_route_cpus.down.sql": _7_add_route_cpusDownSql, + "7_add_route_cpus.up.sql": _7_add_route_cpusUpSql, } // AssetDir returns the file names below a certain @@ -438,6 +482,8 @@ var _bintree = &bintree{nil, map[string]*bintree{ "5_add_app_created_at.up.sql": &bintree{_5_add_app_created_atUpSql, map[string]*bintree{}}, "6_add_app_updated_at.down.sql": &bintree{_6_add_app_updated_atDownSql, map[string]*bintree{}}, "6_add_app_updated_at.up.sql": &bintree{_6_add_app_updated_atUpSql, map[string]*bintree{}}, + "7_add_route_cpus.down.sql": &bintree{_7_add_route_cpusDownSql, map[string]*bintree{}}, + "7_add_route_cpus.up.sql": &bintree{_7_add_route_cpusUpSql, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory diff --git a/api/datastore/sql/sql.go b/api/datastore/sql/sql.go index 474dc96be..b71618400 100644 --- a/api/datastore/sql/sql.go +++ b/api/datastore/sql/sql.go @@ -49,6 +49,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( image varchar(256) NOT NULL, format varchar(16) NOT NULL, memory int NOT NULL, + cpus int, timeout int NOT NULL, idle_timeout int NOT NULL, type varchar(16) NOT NULL, @@ -87,7 +88,7 @@ var tables = [...]string{`CREATE TABLE IF NOT EXISTS routes ( } const ( - routeSelector = `SELECT app_name, path, image, format, memory, type, timeout, idle_timeout, headers, config, created_at, updated_at FROM routes` + routeSelector = `SELECT app_name, path, image, format, memory, cpus, type, timeout, idle_timeout, headers, config, created_at, updated_at FROM routes` callSelector = `SELECT id, created_at, started_at, completed_at, status, app_name, path, stats, error FROM calls` ) @@ -453,6 +454,7 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode image, format, memory, + cpus, type, timeout, idle_timeout, @@ -467,6 +469,7 @@ func (ds *sqlStore) InsertRoute(ctx context.Context, route *models.Route) (*mode :image, :format, :memory, + :cpus, :type, :timeout, :idle_timeout, @@ -507,6 +510,7 @@ func (ds *sqlStore) UpdateRoute(ctx context.Context, newroute *models.Route) (*m image = :image, format = :format, memory = :memory, + cpus = :cpus, type = :type, timeout = :timeout, idle_timeout = :idle_timeout, diff --git a/api/models/call.go b/api/models/call.go index 402991231..d3e320a62 100644 --- a/api/models/call.go +++ b/api/models/call.go @@ -118,6 +118,11 @@ type Call struct { // Memory is the amount of RAM this call is allocated. Memory uint64 `json:"memory,omitempty" db:"-"` + // CPU as in MilliCPUs where each CPU core is split into 1000 units, specified either + // *) milliCPUs as "100m" which is 1/10 of a CPU or + // *) as floating point number "0.1" which is 1/10 of a CPU + CPUs MilliCPUs `json:"cpus,omitempty" db:"-"` + // Config is the set of configuration variables for the call Config Config `json:"config,omitempty" db:"-"` diff --git a/api/models/config.go b/api/models/config.go index 67b36b360..06bbaa57e 100644 --- a/api/models/config.go +++ b/api/models/config.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" ) type Config map[string]string @@ -125,3 +126,67 @@ func (h *Headers) Scan(value interface{}) error { // otherwise, return an error return fmt.Errorf("headers invalid db format: %T %T value, err: %v", value, bv, err) } + +// MilliCPU units +type MilliCPUs uint64 + +const ( + MinMilliCPUs = 0 // 0 is unlimited + MaxMilliCPUs = 1024000 // 1024 CPUs +) + +// implements fmt.Stringer +func (c MilliCPUs) String() string { + if c == 0 { + return "" + } + return fmt.Sprintf("%dm", c) +} + +// implements json.Unmarshaler +func (c *MilliCPUs) UnmarshalJSON(data []byte) error { + + outer := bytes.TrimSpace(data) + if !bytes.HasSuffix(outer, []byte("\"")) || !bytes.HasPrefix(outer, []byte("\"")) { + return ErrInvalidJSON + } + + outer = bytes.TrimPrefix(outer, []byte("\"")) + outer = bytes.TrimSuffix(outer, []byte("\"")) + outer = bytes.TrimSpace(outer) + if len(outer) == 0 { + *c = 0 + return nil + } + + if bytes.HasSuffix(outer, []byte("m")) { + + // Support milli cores as "100m" + outer = bytes.TrimSuffix(outer, []byte("m")) + mCPU, err := strconv.ParseUint(string(outer), 10, 64) + if err != nil || mCPU > MaxMilliCPUs || mCPU < MinMilliCPUs { + return ErrInvalidCPUs + } + *c = MilliCPUs(mCPU) + } else { + // Support for floating point "0.1" style CPU units + fCPU, err := strconv.ParseFloat(string(outer), 64) + if err != nil || fCPU < MinMilliCPUs/1000 || fCPU > MaxMilliCPUs/1000 { + return ErrInvalidCPUs + } + *c = MilliCPUs(fCPU * 1000) + } + + return nil +} + +// implements json.Marshaler +func (c *MilliCPUs) MarshalJSON() ([]byte, error) { + + if *c < MinMilliCPUs || *c > MaxMilliCPUs { + return nil, ErrInvalidCPUs + } + + // always use milli cpus "1000m" format + return []byte(fmt.Sprintf("\"%s\"", c.String())), nil +} diff --git a/api/models/config_test.go b/api/models/config_test.go new file mode 100644 index 000000000..12336d7ad --- /dev/null +++ b/api/models/config_test.go @@ -0,0 +1,112 @@ +package models + +import ( + "encoding/json" + "fmt" + "testing" +) + +func checkStr(input string, expected MilliCPUs) error { + var res MilliCPUs + err := json.Unmarshal([]byte(input), &res) + if err != nil { + return err + } + if expected != res { + return fmt.Errorf("mismatch %s != %s", res, expected) + } + return nil +} + +func checkErr(input string) (MilliCPUs, error) { + var res MilliCPUs + err := json.Unmarshal([]byte(input), &res) + if err != nil { + return res, err + } + return res, nil +} + +func TestMilliCPUsUnmarshal(t *testing.T) { + + err := checkStr("\"1.00\"", 1000) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"1\"", 1000) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"0\"", 0) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"00000\"", 0) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"+00000\"", 0) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"-00000\"", 0) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"0.01\"", 10) + if err != nil { + t.Fatal("failed: ", err) + } + + tmp, err := checkErr("\"1000000000000000000000000\"") + if err == nil { + t.Fatal("failed, should get error, got: ", tmp) + } + + // 0.2341 is too high of a precision for CPUs + err = checkStr("\"0.2341\"", 234) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"1m\"", 1) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"1000m\"", 1000) + if err != nil { + t.Fatal("failed: ", err) + } + + tmp, err = checkErr("\"-100\"") + if err == nil { + t.Fatal("failed, should get error, got: ", tmp) + } + + tmp, err = checkErr("\"100000000000m\"") + if err == nil { + t.Fatal("failed, should get error, got: ", tmp) + } + + err = checkStr("\".2\"", 200) + if err != nil { + t.Fatal("failed: ", err) + } + + err = checkStr("\"100.2000\"", 100200) + if err != nil { + t.Fatal("failed: ", err) + } + + tmp, err = checkErr("\"-0.20\"") + if err == nil { + t.Fatal("failed, should get error got: ", tmp) + } +} diff --git a/api/models/error.go b/api/models/error.go index 5114297e2..e3cde7877 100644 --- a/api/models/error.go +++ b/api/models/error.go @@ -184,6 +184,11 @@ var ( code: http.StatusNotFound, error: errors.New("Path not found"), } + ErrInvalidCPUs = err{ + code: http.StatusBadRequest, + error: fmt.Errorf("Cpus is invalid. Value should be either between [%.3f and %.3f] or [%dm and %dm] milliCPU units", + float64(MinMilliCPUs)/1000.0, float64(MaxMilliCPUs)/1000.0, MinMilliCPUs, MaxMilliCPUs), + } ) // APIError any error that implements this interface will return an API response @@ -202,6 +207,11 @@ func (e err) Code() int { return e.code } func NewAPIError(code int, e error) APIError { return err{code, e} } +func IsAPIError(e error) bool { + _, ok := e.(APIError) + return ok +} + // Error uniform error output type Error struct { Error *ErrorBody `json:"error,omitempty"` diff --git a/api/models/route.go b/api/models/route.go index c2a0ce7a3..f08c4e0f3 100644 --- a/api/models/route.go +++ b/api/models/route.go @@ -29,6 +29,7 @@ type Route struct { Path string `json:"path" db:"path"` Image string `json:"image" db:"image"` Memory uint64 `json:"memory" db:"memory"` + CPUs MilliCPUs `json:"cpus" db:"cpus"` Headers Headers `json:"headers,omitempty" db:"headers"` Type string `json:"type" db:"type"` Format string `json:"format" db:"format"` @@ -161,6 +162,7 @@ func (r1 *Route) Equals(r2 *Route) bool { eq = eq && r1.Path == r2.Path eq = eq && r1.Image == r2.Image eq = eq && r1.Memory == r2.Memory + eq = eq && r1.CPUs == r2.CPUs eq = eq && r1.Headers.Equals(r2.Headers) eq = eq && r1.Type == r2.Type eq = eq && r1.Format == r2.Format @@ -186,6 +188,9 @@ func (r *Route) Update(new *Route) { if new.Memory != 0 { r.Memory = new.Memory } + if new.CPUs != 0 { + r.CPUs = new.CPUs + } if new.Type != "" { r.Type = new.Type } diff --git a/api/models/route_test.go b/api/models/route_test.go new file mode 100644 index 000000000..d5e8e59d1 --- /dev/null +++ b/api/models/route_test.go @@ -0,0 +1,42 @@ +package models + +import ( + "testing" +) + +func TestRouteSimple(t *testing.T) { + + route1 := &Route{ + AppName: "test", + Path: "/some", + Image: "foo", + Memory: 128, + CPUs: 100, + Type: "sync", + Format: "http", + Timeout: 10, + IdleTimeout: 10, + } + + err := route1.Validate() + if err != nil { + t.Fatal("should not have failed, got: ", err) + } + + route2 := &Route{ + AppName: "test", + Path: "/some", + Image: "foo", + Memory: 128, + CPUs: 100, + Type: "sync", + Format: "nonsense", + Timeout: 10, + IdleTimeout: 10, + } + + err = route2.Validate() + if err == nil { + t.Fatalf("should have failed route: %#v", route2) + } +} diff --git a/api/server/apps_create.go b/api/server/apps_create.go index 97c2942df..8de723baa 100644 --- a/api/server/apps_create.go +++ b/api/server/apps_create.go @@ -14,7 +14,11 @@ func (s *Server) handleAppCreate(c *gin.Context) { err := c.BindJSON(&wapp) if err != nil { - handleErrorResponse(c, models.ErrInvalidJSON) + if models.IsAPIError(err) { + handleErrorResponse(c, err) + } else { + handleErrorResponse(c, models.ErrInvalidJSON) + } return } diff --git a/api/server/apps_update.go b/api/server/apps_update.go index 8f4161b9f..9c82b8fa3 100644 --- a/api/server/apps_update.go +++ b/api/server/apps_update.go @@ -15,7 +15,11 @@ func (s *Server) handleAppUpdate(c *gin.Context) { err := c.BindJSON(&wapp) if err != nil { - handleErrorResponse(c, models.ErrInvalidJSON) + if models.IsAPIError(err) { + handleErrorResponse(c, err) + } else { + handleErrorResponse(c, models.ErrInvalidJSON) + } return } diff --git a/api/server/hybrid.go b/api/server/hybrid.go index ed2f23275..de04c7c3f 100644 --- a/api/server/hybrid.go +++ b/api/server/hybrid.go @@ -17,7 +17,11 @@ func (s *Server) handleRunnerEnqueue(c *gin.Context) { var call models.Call err := c.BindJSON(&call) if err != nil { - handleErrorResponse(c, models.ErrInvalidJSON) + if models.IsAPIError(err) { + handleErrorResponse(c, err) + } else { + handleErrorResponse(c, models.ErrInvalidJSON) + } return } @@ -92,7 +96,11 @@ func (s *Server) handleRunnerStart(c *gin.Context) { var call models.Call err := c.BindJSON(&call) if err != nil { - handleErrorResponse(c, models.ErrInvalidJSON) + if models.IsAPIError(err) { + handleErrorResponse(c, err) + } else { + handleErrorResponse(c, models.ErrInvalidJSON) + } return } @@ -143,7 +151,11 @@ func (s *Server) handleRunnerFinish(c *gin.Context) { } err := c.BindJSON(&body) if err != nil { - handleErrorResponse(c, models.ErrInvalidJSON) + if models.IsAPIError(err) { + handleErrorResponse(c, err) + } else { + handleErrorResponse(c, models.ErrInvalidJSON) + } return } diff --git a/api/server/middleware_test.go b/api/server/middleware_test.go index cf19d4fbf..73b368f70 100644 --- a/api/server/middleware_test.go +++ b/api/server/middleware_test.go @@ -73,7 +73,7 @@ func TestRootMiddleware(t *testing.T) { {Name: "myapp2", Config: models.Config{}}, }, []*models.Route{ - {Path: "/", AppName: "myapp", Image: "fnproject/hello", Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: map[string][]string{"X-Function": {"Test"}}}, + {Path: "/", AppName: "myapp", Image: "fnproject/hello", Type: "sync", Memory: 128, CPUs: 100, Timeout: 30, IdleTimeout: 30, Headers: map[string][]string{"X-Function": {"Test"}}}, {Path: "/myroute", AppName: "myapp", Image: "fnproject/hello", Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: map[string][]string{"X-Function": {"Test"}}}, {Path: "/app2func", AppName: "myapp2", Image: "fnproject/hello", Type: "sync", Memory: 128, Timeout: 30, IdleTimeout: 30, Headers: map[string][]string{"X-Function": {"Test"}}, Config: map[string]string{"NAME": "johnny"}, diff --git a/api/server/routes_create_update.go b/api/server/routes_create_update.go index 729253001..03039c838 100644 --- a/api/server/routes_create_update.go +++ b/api/server/routes_create_update.go @@ -141,6 +141,9 @@ func (s *Server) ensureApp(ctx context.Context, wroute *models.RouteWrapper, met func bindRoute(c *gin.Context, method string, wroute *models.RouteWrapper) error { err := c.BindJSON(wroute) if err != nil { + if models.IsAPIError(err) { + return err + } return models.ErrInvalidJSON } diff --git a/api/server/routes_test.go b/api/server/routes_test.go index d278c78d2..103c852c4 100644 --- a/api/server/routes_test.go +++ b/api/server/routes_test.go @@ -107,6 +107,7 @@ func TestRouteCreate(t *testing.T) { {datastore.NewMock(), logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", `{ "route": { "image": "fnproject/hello", "type": "sync" } }`, http.StatusBadRequest, models.ErrRoutesMissingPath}, {datastore.NewMock(), logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", `{ "route": { "image": "fnproject/hello", "path": "myroute", "type": "sync" } }`, http.StatusBadRequest, models.ErrRoutesInvalidPath}, {datastore.NewMock(), logs.NewMock(), http.MethodPost, "/v1/apps/$/routes", `{ "route": { "image": "fnproject/hello", "path": "/myroute", "type": "sync" } }`, http.StatusBadRequest, models.ErrAppsInvalidName}, + {datastore.NewMock(), logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", `{ "route": { "image": "fnproject/hello", "path": "/myroute", "type": "sync", "cpus": "-100" } }`, http.StatusBadRequest, models.ErrInvalidCPUs}, {datastore.NewMockInit(nil, []*models.Route{ { @@ -118,6 +119,8 @@ func TestRouteCreate(t *testing.T) { // success {datastore.NewMock(), logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", `{ "route": { "image": "fnproject/hello", "path": "/myroute", "type": "sync" } }`, http.StatusOK, nil}, + {datastore.NewMock(), logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", `{ "route": { "image": "fnproject/hello", "path": "/myroute", "type": "sync", "cpus": "100m" } }`, http.StatusOK, nil}, + {datastore.NewMock(), logs.NewMock(), http.MethodPost, "/v1/apps/a/routes", `{ "route": { "image": "fnproject/hello", "path": "/myroute", "type": "sync", "cpus": "0.2" } }`, http.StatusOK, nil}, } { test.run(t, i, buf) } @@ -332,6 +335,7 @@ func TestRouteUpdate(t *testing.T) { {ds, logs.NewMock(), http.MethodPatch, "/v1/apps/a/routes/myroute/do", `{ "route": { "type": "async", "timeout": 121, "idle_timeout": 240 } }`, http.StatusOK, nil}, // should work if async {ds, logs.NewMock(), http.MethodPatch, "/v1/apps/a/routes/myroute/do", `{ "route": { "idle_timeout": 3601 } }`, http.StatusBadRequest, models.ErrRoutesInvalidIdleTimeout}, {ds, logs.NewMock(), http.MethodPatch, "/v1/apps/a/routes/myroute/do", `{ "route": { "memory": 100000000000000 } }`, http.StatusBadRequest, models.ErrRoutesInvalidMemory}, + {ds, logs.NewMock(), http.MethodPatch, "/v1/apps/a/routes/myroute/do", `{ "route": { "cpus": "foo" } }`, http.StatusBadRequest, models.ErrInvalidCPUs}, // TODO this should be correct, waiting for patch to come in //{ds, logs.NewMock(), http.MethodPatch, "/v1/apps/b/routes/myroute/dont", `{ "route": {} }`, http.StatusNotFound, models.ErrAppsNotFound}, {ds, logs.NewMock(), http.MethodPatch, "/v1/apps/a/routes/myroute/dont", `{ "route": {} }`, http.StatusNotFound, models.ErrRoutesNotFound}, diff --git a/api/server/runner_async_test.go b/api/server/runner_async_test.go index adad4303b..2188cf3b6 100644 --- a/api/server/runner_async_test.go +++ b/api/server/runner_async_test.go @@ -42,7 +42,7 @@ func TestRouteRunnerAsyncExecution(t *testing.T) { []*models.Route{ {Type: "async", Path: "/hot-http", AppName: "myapp", Image: "fnproject/fn-test-utils", Format: "http", Config: map[string]string{"test": "true"}, Memory: 128, Timeout: 4, IdleTimeout: 30}, {Type: "async", Path: "/hot-json", AppName: "myapp", Image: "fnproject/fn-test-utils", Format: "json", Config: map[string]string{"test": "true"}, Memory: 128, Timeout: 4, IdleTimeout: 30}, - {Type: "async", Path: "/myroute", AppName: "myapp", Image: "fnproject/hello", Config: map[string]string{"test": "true"}, Memory: 128, Timeout: 30, IdleTimeout: 30}, + {Type: "async", Path: "/myroute", AppName: "myapp", Image: "fnproject/hello", Config: map[string]string{"test": "true"}, Memory: 128, CPUs: 200, Timeout: 30, IdleTimeout: 30}, {Type: "async", Path: "/myerror", AppName: "myapp", Image: "fnproject/error", Config: map[string]string{"test": "true"}, Memory: 128, Timeout: 30, IdleTimeout: 30}, {Type: "async", Path: "/myroute/:param", AppName: "myapp", Image: "fnproject/hello", Config: map[string]string{"test": "true"}, Memory: 128, Timeout: 30, IdleTimeout: 30}, }, nil, diff --git a/docs/swagger.yml b/docs/swagger.yml index 672fda415..b4ca88ad0 100644 --- a/docs/swagger.yml +++ b/docs/swagger.yml @@ -488,6 +488,9 @@ definitions: type: integer format: uint64 description: Max usable memory for this route (MiB). + cpus: + type: string + description: Max usable CPU cores for this route. Value in MilliCPUs (eg. 500m) or as floating-point (eg. 0.5) type: enum: - sync