From 25f6706642d79c0f143d9e5524d20f72d19dce87 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 1 Dec 2017 11:21:16 -0800 Subject: [PATCH] Container memory tracking related changes (#541) * squash# This is a combination of 10 commits2 fn: get available memory related changes *) getAvailableMemory() improvements *) early fail if requested memory too large to meet *) tracking async and sync pools individually. Sync pool is reserved for sync jobs only, while async pool can be used by all jobs. *) head room estimation for available memory in Linux. --- api/agent/agent.go | 12 ++- api/agent/resource.go | 194 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 168 insertions(+), 38 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 784c03532..1950eaa8c 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -275,6 +275,11 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) ( default: } + // IMPORTANT: This means, if this request was submitted indirectly through fnlb or + // other proxy, we will continue classifying it as 'async' which is good as async + // regardless of origin should use the async resources. + isAsync := call.Type == models.TypeAsync + // add context cancel here to prevent ramToken/launch race, w/o this ramToken / // launch won't know whether we are no longer receiving or not yet receiving. ctx, launchCancel := context.WithCancel(ctx) @@ -284,7 +289,10 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) ( select { case s := <-slots: return s, nil - case tok := <-a.resources.GetResourceToken(ctx, call): + case tok, isOpen := <-a.resources.GetResourceToken(ctx, call.Memory, isAsync): + if !isOpen { + return nil, models.ErrCallTimeoutServerBusy + } errCh = a.launch(ctx, slots, call, tok) // TODO mangle case <-ctx.Done(): return nil, ctx.Err() @@ -494,7 +502,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok // about timing out if this takes a while... cookie, err := a.driver.Prepare(ctx, container) if err != nil { - tok.Close() // TODO make this less brittle + tok.Close() return err } diff --git a/api/agent/resource.go b/api/agent/resource.go index 4f14c3bf8..a20702528 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -17,30 +17,35 @@ import ( ) // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. -// TODO: improve memory implementation // TODO: add cpu, disk, network IO for future type ResourceTracker interface { WaitAsyncResource() chan struct{} - GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken + // returns a closed channel if the resource can never me met. + GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken } type resourceTracker struct { - // cond protects access to ramUsed + // cond protects access to ram variables below cond *sync.Cond - // ramTotal is the total accessible memory by this process - ramTotal uint64 - // ramUsed is ram reserved for running containers. idle hot containers - // count against ramUsed. - ramUsed uint64 + // ramTotal is the total usable memory for sync functions + ramSyncTotal uint64 + // ramSyncUsed is ram reserved for running sync containers including hot/idle + ramSyncUsed uint64 + // ramAsyncTotal is the total usable memory for async + sync functions + ramAsyncTotal uint64 + // ramAsyncUsed is ram reserved for running async + sync containers including hot/idle + ramAsyncUsed uint64 + // memory in use for async area in which agent stops dequeuing async jobs + ramAsyncHWMark uint64 } func NewResourceTracker() ResourceTracker { obj := &resourceTracker{ - cond: sync.NewCond(new(sync.Mutex)), - ramTotal: getAvailableMemory(), + cond: sync.NewCond(new(sync.Mutex)), } + obj.initializeMemory() return obj } @@ -58,18 +63,46 @@ func (t *resourceToken) Close() error { return nil } +func (a *resourceTracker) isResourceAvailableLocked(memory uint64, isAsync bool) bool { + + asyncAvail := a.ramAsyncTotal - a.ramAsyncUsed + syncAvail := a.ramSyncTotal - a.ramSyncUsed + + // For sync functions, we can steal from async pool. For async, we restrict it to sync pool + if isAsync { + return asyncAvail >= memory + } else { + return asyncAvail+syncAvail >= memory + } +} + +// is this request possible to meet? If no, fail quick +func (a *resourceTracker) isResourcePossible(memory uint64, isAsync bool) bool { + if isAsync { + return memory <= a.ramAsyncTotal + } else { + return memory <= a.ramSyncTotal+a.ramAsyncTotal + } +} + // the received token should be passed directly to launch (unconditionally), launch // will close this token (i.e. the receiver should not call Close) -func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken { +func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken { - memory := call.Memory * 1024 * 1024 + memory = memory * 1024 * 1024 c := a.cond ch := make(chan ResourceToken) + if !a.isResourcePossible(memory, isAsync) { + close(ch) + return ch + } + go func() { c.L.Lock() - for (a.ramUsed + memory) > a.ramTotal { + + for !a.isResourceAvailableLocked(memory, isAsync) { select { case <-ctx.Done(): c.L.Unlock() @@ -80,13 +113,33 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-ch c.Wait() } - a.ramUsed += memory + var asyncMem, syncMem uint64 + + if isAsync { + // async uses async pool only + asyncMem = memory + } else if a.ramSyncTotal-a.ramSyncUsed >= memory { + // if sync fits in sync pool + syncMem = memory + } else { + // if sync fits async + sync pool + syncMem = a.ramSyncTotal - a.ramSyncUsed + asyncMem = memory - syncMem + } + + a.ramAsyncUsed += asyncMem + a.ramSyncUsed += syncMem c.L.Unlock() t := &resourceToken{decrement: func() { c.L.Lock() - a.ramUsed -= memory + a.ramAsyncUsed -= asyncMem + a.ramSyncUsed -= syncMem c.L.Unlock() + + // WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but + // the cost of this spurious wake up is unlikely to impact much performance. Simpler + // to use one cond variable for the time being. c.Broadcast() }} @@ -101,15 +154,15 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-ch return ch } -// GetAsyncResource will send a signal on the returned channel when at least half of -// the available RAM on this machine is free. +// WaitAsyncResource will send a signal on the returned channel when RAM in-use +// in the async area is less than high water mark func (a *resourceTracker) WaitAsyncResource() chan struct{} { ch := make(chan struct{}) c := a.cond go func() { c.L.Lock() - for (a.ramTotal/2)-a.ramUsed < 0 { + for a.ramSyncUsed >= a.ramAsyncHWMark { c.Wait() } c.L.Unlock() @@ -120,31 +173,100 @@ func (a *resourceTracker) WaitAsyncResource() chan struct{} { return ch } -func getAvailableMemory() uint64 { - const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB +func minUint64(a, b uint64) uint64 { + if a <= b { + return a + } + return b +} + +func maxUint64(a, b uint64) uint64 { + if a >= b { + return a + } + return b +} + +func clampUint64(val, min, max uint64) uint64 { + val = minUint64(val, max) + val = maxUint64(val, min) + return val +} + +func (a *resourceTracker) initializeMemory() { + + var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64 - var availableMemory uint64 = tooBig if runtime.GOOS == "linux" { - var err error - availableMemory, err = checkCgroup() + + // system wide available memory + totalMemory, err := checkProc() + if err != nil { + logrus.WithError(err).Fatal("Cannot get the proper memory information to size server.") + } + + availMemory := totalMemory + + // cgroup limit restriction on memory usage + cGroupLimit, err := checkCgroup() if err != nil { logrus.WithError(err).Error("Error checking for cgroup memory limits, falling back to host memory available..") + } else { + availMemory = minUint64(cGroupLimit, availMemory) } - if availableMemory >= tooBig || availableMemory <= 0 { - // Then -m flag probably wasn't set, so use max available on system - availableMemory, err = checkProc() - if availableMemory >= tooBig || availableMemory <= 0 { - logrus.WithError(err).Fatal("Cannot get the proper memory information to size server. You must specify the maximum available memory by passing the -m command with docker run when starting the server via docker, eg: `docker run -m 2G ...`") - } - } + + // clamp the available memory by head room (for docker, ourselves, other processes) + headRoom := getMemoryHeadRoom(availMemory) + availMemory = availMemory - headRoom + + logrus.WithFields(logrus.Fields{ + "totalMemory": totalMemory, + "availMemory": availMemory, + "headRoom": headRoom, + "cgroupLimit": cGroupLimit, + }).Info("available memory") + + // %20 of ram for sync only reserve + maxSyncMemory = uint64(availMemory * 2 / 10) + maxAsyncMemory = availMemory - maxSyncMemory + ramAsyncHWMark = maxAsyncMemory * 8 / 10 + } else { - // This still lets 10-20 functions execute concurrently assuming a 2GB machine. - availableMemory = 2 * 1024 * 1024 * 1024 + // non-linux: assume 512MB sync only memory and 1.5GB async + sync memory + maxSyncMemory = 512 * 1024 * 1024 + maxAsyncMemory = (1024 + 512) * 1024 * 1024 + ramAsyncHWMark = 1024 * 1024 * 1024 } - logrus.WithFields(logrus.Fields{"ram": availableMemory}).Info("available memory") + // For non-linux OS, we expect these (or their defaults) properly configured from command-line/env + logrus.WithFields(logrus.Fields{ + "ramSync": maxSyncMemory, + "ramAsync": maxAsyncMemory, + "ramAsyncHWMark": ramAsyncHWMark, + }).Info("sync and async reservations") - return availableMemory + if maxSyncMemory == 0 || maxAsyncMemory == 0 { + logrus.Fatal("Cannot get the proper memory pool information to size server") + } + + a.ramAsyncHWMark = ramAsyncHWMark + a.ramSyncTotal = maxSyncMemory + a.ramAsyncTotal = maxAsyncMemory +} + +// headroom estimation in order not to consume entire RAM if possible +func getMemoryHeadRoom(usableMemory uint64) uint64 { + + // get %10 of the RAM + headRoom := uint64(usableMemory / 10) + + // clamp this with 256MB min -- 5GB max + maxHeadRoom := uint64(5 * 1024 * 1024 * 1024) + minHeadRoom := uint64(256 * 1024 * 1024) + minHeadRoom = minUint64(minHeadRoom, usableMemory) + + headRoom = clampUint64(headRoom, minHeadRoom, maxHeadRoom) + return headRoom } func checkCgroup() (uint64, error) { @@ -154,11 +276,11 @@ func checkCgroup() (uint64, error) { } defer f.Close() b, err := ioutil.ReadAll(f) - limBytes := string(b) - limBytes = strings.TrimSpace(limBytes) if err != nil { return 0, err } + limBytes := string(b) + limBytes = strings.TrimSpace(limBytes) return strconv.ParseUint(limBytes, 10, 64) }