Container memory tracking related changes (#541)

* squash# This is a combination of 10 commits2

fn: get available memory related changes

*) getAvailableMemory() improvements
*) early fail if requested memory too large to meet
*) tracking async and sync pools individually. Sync pool
is reserved for sync jobs only, while async pool can be
used by all jobs.
*) head room estimation for available memory in Linux.
This commit is contained in:
Tolga Ceylan
2017-12-01 11:21:16 -08:00
committed by GitHub
parent ee2cf07feb
commit 25f6706642
2 changed files with 168 additions and 38 deletions

View File

@@ -275,6 +275,11 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) (
default: default:
} }
// IMPORTANT: This means, if this request was submitted indirectly through fnlb or
// other proxy, we will continue classifying it as 'async' which is good as async
// regardless of origin should use the async resources.
isAsync := call.Type == models.TypeAsync
// add context cancel here to prevent ramToken/launch race, w/o this ramToken / // add context cancel here to prevent ramToken/launch race, w/o this ramToken /
// launch won't know whether we are no longer receiving or not yet receiving. // launch won't know whether we are no longer receiving or not yet receiving.
ctx, launchCancel := context.WithCancel(ctx) ctx, launchCancel := context.WithCancel(ctx)
@@ -284,7 +289,10 @@ func (a *agent) launchOrSlot(ctx context.Context, slots chan slot, call *call) (
select { select {
case s := <-slots: case s := <-slots:
return s, nil return s, nil
case tok := <-a.resources.GetResourceToken(ctx, call): case tok, isOpen := <-a.resources.GetResourceToken(ctx, call.Memory, isAsync):
if !isOpen {
return nil, models.ErrCallTimeoutServerBusy
}
errCh = a.launch(ctx, slots, call, tok) // TODO mangle errCh = a.launch(ctx, slots, call, tok) // TODO mangle
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
@@ -494,7 +502,7 @@ func (a *agent) prepCold(ctx context.Context, slots chan<- slot, call *call, tok
// about timing out if this takes a while... // about timing out if this takes a while...
cookie, err := a.driver.Prepare(ctx, container) cookie, err := a.driver.Prepare(ctx, container)
if err != nil { if err != nil {
tok.Close() // TODO make this less brittle tok.Close()
return err return err
} }

View File

@@ -17,30 +17,35 @@ import (
) )
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
// TODO: improve memory implementation
// TODO: add cpu, disk, network IO for future // TODO: add cpu, disk, network IO for future
type ResourceTracker interface { type ResourceTracker interface {
WaitAsyncResource() chan struct{} WaitAsyncResource() chan struct{}
GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken // returns a closed channel if the resource can never me met.
GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken
} }
type resourceTracker struct { type resourceTracker struct {
// cond protects access to ramUsed // cond protects access to ram variables below
cond *sync.Cond cond *sync.Cond
// ramTotal is the total accessible memory by this process // ramTotal is the total usable memory for sync functions
ramTotal uint64 ramSyncTotal uint64
// ramUsed is ram reserved for running containers. idle hot containers // ramSyncUsed is ram reserved for running sync containers including hot/idle
// count against ramUsed. ramSyncUsed uint64
ramUsed uint64 // ramAsyncTotal is the total usable memory for async + sync functions
ramAsyncTotal uint64
// ramAsyncUsed is ram reserved for running async + sync containers including hot/idle
ramAsyncUsed uint64
// memory in use for async area in which agent stops dequeuing async jobs
ramAsyncHWMark uint64
} }
func NewResourceTracker() ResourceTracker { func NewResourceTracker() ResourceTracker {
obj := &resourceTracker{ obj := &resourceTracker{
cond: sync.NewCond(new(sync.Mutex)), cond: sync.NewCond(new(sync.Mutex)),
ramTotal: getAvailableMemory(),
} }
obj.initializeMemory()
return obj return obj
} }
@@ -58,18 +63,46 @@ func (t *resourceToken) Close() error {
return nil return nil
} }
func (a *resourceTracker) isResourceAvailableLocked(memory uint64, isAsync bool) bool {
asyncAvail := a.ramAsyncTotal - a.ramAsyncUsed
syncAvail := a.ramSyncTotal - a.ramSyncUsed
// For sync functions, we can steal from async pool. For async, we restrict it to sync pool
if isAsync {
return asyncAvail >= memory
} else {
return asyncAvail+syncAvail >= memory
}
}
// is this request possible to meet? If no, fail quick
func (a *resourceTracker) isResourcePossible(memory uint64, isAsync bool) bool {
if isAsync {
return memory <= a.ramAsyncTotal
} else {
return memory <= a.ramSyncTotal+a.ramAsyncTotal
}
}
// the received token should be passed directly to launch (unconditionally), launch // the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close) // will close this token (i.e. the receiver should not call Close)
func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-chan ResourceToken { func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken {
memory := call.Memory * 1024 * 1024 memory = memory * 1024 * 1024
c := a.cond c := a.cond
ch := make(chan ResourceToken) ch := make(chan ResourceToken)
if !a.isResourcePossible(memory, isAsync) {
close(ch)
return ch
}
go func() { go func() {
c.L.Lock() c.L.Lock()
for (a.ramUsed + memory) > a.ramTotal {
for !a.isResourceAvailableLocked(memory, isAsync) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
c.L.Unlock() c.L.Unlock()
@@ -80,13 +113,33 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-ch
c.Wait() c.Wait()
} }
a.ramUsed += memory var asyncMem, syncMem uint64
if isAsync {
// async uses async pool only
asyncMem = memory
} else if a.ramSyncTotal-a.ramSyncUsed >= memory {
// if sync fits in sync pool
syncMem = memory
} else {
// if sync fits async + sync pool
syncMem = a.ramSyncTotal - a.ramSyncUsed
asyncMem = memory - syncMem
}
a.ramAsyncUsed += asyncMem
a.ramSyncUsed += syncMem
c.L.Unlock() c.L.Unlock()
t := &resourceToken{decrement: func() { t := &resourceToken{decrement: func() {
c.L.Lock() c.L.Lock()
a.ramUsed -= memory a.ramAsyncUsed -= asyncMem
a.ramSyncUsed -= syncMem
c.L.Unlock() c.L.Unlock()
// WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but
// the cost of this spurious wake up is unlikely to impact much performance. Simpler
// to use one cond variable for the time being.
c.Broadcast() c.Broadcast()
}} }}
@@ -101,15 +154,15 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, call *call) <-ch
return ch return ch
} }
// GetAsyncResource will send a signal on the returned channel when at least half of // WaitAsyncResource will send a signal on the returned channel when RAM in-use
// the available RAM on this machine is free. // in the async area is less than high water mark
func (a *resourceTracker) WaitAsyncResource() chan struct{} { func (a *resourceTracker) WaitAsyncResource() chan struct{} {
ch := make(chan struct{}) ch := make(chan struct{})
c := a.cond c := a.cond
go func() { go func() {
c.L.Lock() c.L.Lock()
for (a.ramTotal/2)-a.ramUsed < 0 { for a.ramSyncUsed >= a.ramAsyncHWMark {
c.Wait() c.Wait()
} }
c.L.Unlock() c.L.Unlock()
@@ -120,31 +173,100 @@ func (a *resourceTracker) WaitAsyncResource() chan struct{} {
return ch return ch
} }
func getAvailableMemory() uint64 { func minUint64(a, b uint64) uint64 {
const tooBig = 322122547200 // #300GB or 0, biggest aws instance is 244GB if a <= b {
return a
}
return b
}
func maxUint64(a, b uint64) uint64 {
if a >= b {
return a
}
return b
}
func clampUint64(val, min, max uint64) uint64 {
val = minUint64(val, max)
val = maxUint64(val, min)
return val
}
func (a *resourceTracker) initializeMemory() {
var maxSyncMemory, maxAsyncMemory, ramAsyncHWMark uint64
var availableMemory uint64 = tooBig
if runtime.GOOS == "linux" { if runtime.GOOS == "linux" {
var err error
availableMemory, err = checkCgroup() // system wide available memory
totalMemory, err := checkProc()
if err != nil {
logrus.WithError(err).Fatal("Cannot get the proper memory information to size server.")
}
availMemory := totalMemory
// cgroup limit restriction on memory usage
cGroupLimit, err := checkCgroup()
if err != nil { if err != nil {
logrus.WithError(err).Error("Error checking for cgroup memory limits, falling back to host memory available..") logrus.WithError(err).Error("Error checking for cgroup memory limits, falling back to host memory available..")
} else {
availMemory = minUint64(cGroupLimit, availMemory)
} }
if availableMemory >= tooBig || availableMemory <= 0 {
// Then -m flag probably wasn't set, so use max available on system // clamp the available memory by head room (for docker, ourselves, other processes)
availableMemory, err = checkProc() headRoom := getMemoryHeadRoom(availMemory)
if availableMemory >= tooBig || availableMemory <= 0 { availMemory = availMemory - headRoom
logrus.WithError(err).Fatal("Cannot get the proper memory information to size server. You must specify the maximum available memory by passing the -m command with docker run when starting the server via docker, eg: `docker run -m 2G ...`")
} logrus.WithFields(logrus.Fields{
} "totalMemory": totalMemory,
"availMemory": availMemory,
"headRoom": headRoom,
"cgroupLimit": cGroupLimit,
}).Info("available memory")
// %20 of ram for sync only reserve
maxSyncMemory = uint64(availMemory * 2 / 10)
maxAsyncMemory = availMemory - maxSyncMemory
ramAsyncHWMark = maxAsyncMemory * 8 / 10
} else { } else {
// This still lets 10-20 functions execute concurrently assuming a 2GB machine. // non-linux: assume 512MB sync only memory and 1.5GB async + sync memory
availableMemory = 2 * 1024 * 1024 * 1024 maxSyncMemory = 512 * 1024 * 1024
maxAsyncMemory = (1024 + 512) * 1024 * 1024
ramAsyncHWMark = 1024 * 1024 * 1024
} }
logrus.WithFields(logrus.Fields{"ram": availableMemory}).Info("available memory") // For non-linux OS, we expect these (or their defaults) properly configured from command-line/env
logrus.WithFields(logrus.Fields{
"ramSync": maxSyncMemory,
"ramAsync": maxAsyncMemory,
"ramAsyncHWMark": ramAsyncHWMark,
}).Info("sync and async reservations")
return availableMemory if maxSyncMemory == 0 || maxAsyncMemory == 0 {
logrus.Fatal("Cannot get the proper memory pool information to size server")
}
a.ramAsyncHWMark = ramAsyncHWMark
a.ramSyncTotal = maxSyncMemory
a.ramAsyncTotal = maxAsyncMemory
}
// headroom estimation in order not to consume entire RAM if possible
func getMemoryHeadRoom(usableMemory uint64) uint64 {
// get %10 of the RAM
headRoom := uint64(usableMemory / 10)
// clamp this with 256MB min -- 5GB max
maxHeadRoom := uint64(5 * 1024 * 1024 * 1024)
minHeadRoom := uint64(256 * 1024 * 1024)
minHeadRoom = minUint64(minHeadRoom, usableMemory)
headRoom = clampUint64(headRoom, minHeadRoom, maxHeadRoom)
return headRoom
} }
func checkCgroup() (uint64, error) { func checkCgroup() (uint64, error) {
@@ -154,11 +276,11 @@ func checkCgroup() (uint64, error) {
} }
defer f.Close() defer f.Close()
b, err := ioutil.ReadAll(f) b, err := ioutil.ReadAll(f)
limBytes := string(b)
limBytes = strings.TrimSpace(limBytes)
if err != nil { if err != nil {
return 0, err return 0, err
} }
limBytes := string(b)
limBytes = strings.TrimSpace(limBytes)
return strconv.ParseUint(limBytes, 10, 64) return strconv.ParseUint(limBytes, 10, 64)
} }