From 7185f8306a8dd8ed3c18c18f19021c0a960d615f Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 5 Jan 2018 10:27:26 -0800 Subject: [PATCH] fn: async wait bugfix and low mem warnings (#649) *) fix async high water mark logic *) warning on 256 mem for async and async+sync pools --- api/agent/resource.go | 25 ++++++++---- api/agent/resource_test.go | 83 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 7 deletions(-) create mode 100644 api/agent/resource_test.go diff --git a/api/agent/resource.go b/api/agent/resource.go index a40c905bb..f1225a657 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -16,6 +16,11 @@ import ( "github.com/sirupsen/logrus" ) +const ( + Mem1MB = 1024 * 1024 + Mem1GB = 1024 * 1024 * 1024 +) + // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // TODO: add cpu, disk, network IO for future type ResourceTracker interface { @@ -89,7 +94,7 @@ func (a *resourceTracker) isResourcePossible(memory uint64, isAsync bool) bool { // will close this token (i.e. the receiver should not call Close) func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken { - memory = memory * 1024 * 1024 + memory = memory * Mem1MB c := a.cond ch := make(chan ResourceToken) @@ -162,7 +167,7 @@ func (a *resourceTracker) WaitAsyncResource() chan struct{} { c := a.cond go func() { c.L.Lock() - for a.ramSyncUsed >= a.ramAsyncHWMark { + for a.ramAsyncUsed >= a.ramAsyncHWMark { c.Wait() } c.L.Unlock() @@ -236,9 +241,9 @@ func (a *resourceTracker) initializeMemory() { } else { // non-linux: assume 512MB sync only memory and 1.5GB async + sync memory - maxSyncMemory = 512 * 1024 * 1024 - maxAsyncMemory = (1024 + 512) * 1024 * 1024 - ramAsyncHWMark = 1024 * 1024 * 1024 + maxSyncMemory = 512 * Mem1MB + maxAsyncMemory = (1024 + 512) * Mem1MB + ramAsyncHWMark = 1024 * Mem1MB } // For non-linux OS, we expect these (or their defaults) properly configured from command-line/env @@ -252,6 +257,12 @@ func (a *resourceTracker) initializeMemory() { logrus.Fatal("Cannot get the proper memory pool information to size server") } + if maxSyncMemory+maxAsyncMemory < 256*Mem1MB { + logrus.Warn("Severaly Limited memory: ramSync + ramAsync < 256MB") + } else if maxAsyncMemory < 256*Mem1MB { + logrus.Warn("Severaly Limited memory: ramAsync < 256MB") + } + a.ramAsyncHWMark = ramAsyncHWMark a.ramSyncTotal = maxSyncMemory a.ramAsyncTotal = maxAsyncMemory @@ -264,8 +275,8 @@ func getMemoryHeadRoom(usableMemory uint64) (uint64, error) { headRoom := uint64(usableMemory / 10) // clamp this with 256MB min -- 5GB max - maxHeadRoom := uint64(5 * 1024 * 1024 * 1024) - minHeadRoom := uint64(256 * 1024 * 1024) + maxHeadRoom := uint64(5 * Mem1GB) + minHeadRoom := uint64(256 * Mem1MB) if minHeadRoom >= usableMemory { return 0, fmt.Errorf("Not enough memory: %v", usableMemory) diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go new file mode 100644 index 000000000..2087a9062 --- /dev/null +++ b/api/agent/resource_test.go @@ -0,0 +1,83 @@ +package agent + +import ( + "testing" + "time" +) + +func setTrackerTestVals(tr *resourceTracker, vals *trackerVals) { + tr.cond.L.Lock() + + tr.ramSyncTotal = vals.st + tr.ramSyncUsed = vals.su + tr.ramAsyncTotal = vals.at + tr.ramAsyncUsed = vals.au + tr.ramAsyncHWMark = vals.am + + tr.cond.L.Unlock() + tr.cond.Broadcast() +} + +func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) { + + tr.cond.L.Lock() + + vals.st = tr.ramSyncTotal + vals.su = tr.ramSyncUsed + vals.at = tr.ramAsyncTotal + vals.au = tr.ramAsyncUsed + vals.am = tr.ramAsyncHWMark + + tr.cond.L.Unlock() +} + +type trackerVals struct { + st uint64 + su uint64 + at uint64 + au uint64 + am uint64 +} + +func TestResourceAsyncMem(t *testing.T) { + + var vals trackerVals + + trI := NewResourceTracker() + + tr := trI.(*resourceTracker) + + getTrackerTestVals(tr, &vals) + if vals.st <= 0 || vals.su != 0 || vals.at <= 0 || vals.au != 0 || vals.am <= 0 { + t.Fatalf("faulty init %#v", vals) + } + + // set set these to known vals + vals.st = 1 * 1024 * 1024 + vals.su = 0 + vals.at = 2 * 1024 * 1024 + vals.au = 0 + vals.am = 1 * 1024 * 1024 + + // should block & wait + vals.au = vals.am + setTrackerTestVals(tr, &vals) + ch := tr.WaitAsyncResource() + + select { + case <-ch: + t.Fatal("high water mark over, should not trigger") + case <-time.After(time.Duration(500) * time.Millisecond): + } + + // should not block & wait + vals.au = 0 + setTrackerTestVals(tr, &vals) + + select { + case <-ch: + case <-time.After(time.Duration(500) * time.Millisecond): + t.Fatal("high water mark not over, should trigger") + } + +}