mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
fn: async wait bugfix and low mem warnings (#649)
*) fix async high water mark logic *) warning on 256 mem for async and async+sync pools
This commit is contained in:
@@ -16,6 +16,11 @@ import (
|
|||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
Mem1MB = 1024 * 1024
|
||||||
|
Mem1GB = 1024 * 1024 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
|
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
|
||||||
// TODO: add cpu, disk, network IO for future
|
// TODO: add cpu, disk, network IO for future
|
||||||
type ResourceTracker interface {
|
type ResourceTracker interface {
|
||||||
@@ -89,7 +94,7 @@ func (a *resourceTracker) isResourcePossible(memory uint64, isAsync bool) bool {
|
|||||||
// will close this token (i.e. the receiver should not call Close)
|
// will close this token (i.e. the receiver should not call Close)
|
||||||
func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken {
|
func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, isAsync bool) <-chan ResourceToken {
|
||||||
|
|
||||||
memory = memory * 1024 * 1024
|
memory = memory * Mem1MB
|
||||||
|
|
||||||
c := a.cond
|
c := a.cond
|
||||||
ch := make(chan ResourceToken)
|
ch := make(chan ResourceToken)
|
||||||
@@ -162,7 +167,7 @@ func (a *resourceTracker) WaitAsyncResource() chan struct{} {
|
|||||||
c := a.cond
|
c := a.cond
|
||||||
go func() {
|
go func() {
|
||||||
c.L.Lock()
|
c.L.Lock()
|
||||||
for a.ramSyncUsed >= a.ramAsyncHWMark {
|
for a.ramAsyncUsed >= a.ramAsyncHWMark {
|
||||||
c.Wait()
|
c.Wait()
|
||||||
}
|
}
|
||||||
c.L.Unlock()
|
c.L.Unlock()
|
||||||
@@ -236,9 +241,9 @@ func (a *resourceTracker) initializeMemory() {
|
|||||||
|
|
||||||
} else {
|
} else {
|
||||||
// non-linux: assume 512MB sync only memory and 1.5GB async + sync memory
|
// non-linux: assume 512MB sync only memory and 1.5GB async + sync memory
|
||||||
maxSyncMemory = 512 * 1024 * 1024
|
maxSyncMemory = 512 * Mem1MB
|
||||||
maxAsyncMemory = (1024 + 512) * 1024 * 1024
|
maxAsyncMemory = (1024 + 512) * Mem1MB
|
||||||
ramAsyncHWMark = 1024 * 1024 * 1024
|
ramAsyncHWMark = 1024 * Mem1MB
|
||||||
}
|
}
|
||||||
|
|
||||||
// For non-linux OS, we expect these (or their defaults) properly configured from command-line/env
|
// For non-linux OS, we expect these (or their defaults) properly configured from command-line/env
|
||||||
@@ -252,6 +257,12 @@ func (a *resourceTracker) initializeMemory() {
|
|||||||
logrus.Fatal("Cannot get the proper memory pool information to size server")
|
logrus.Fatal("Cannot get the proper memory pool information to size server")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if maxSyncMemory+maxAsyncMemory < 256*Mem1MB {
|
||||||
|
logrus.Warn("Severaly Limited memory: ramSync + ramAsync < 256MB")
|
||||||
|
} else if maxAsyncMemory < 256*Mem1MB {
|
||||||
|
logrus.Warn("Severaly Limited memory: ramAsync < 256MB")
|
||||||
|
}
|
||||||
|
|
||||||
a.ramAsyncHWMark = ramAsyncHWMark
|
a.ramAsyncHWMark = ramAsyncHWMark
|
||||||
a.ramSyncTotal = maxSyncMemory
|
a.ramSyncTotal = maxSyncMemory
|
||||||
a.ramAsyncTotal = maxAsyncMemory
|
a.ramAsyncTotal = maxAsyncMemory
|
||||||
@@ -264,8 +275,8 @@ func getMemoryHeadRoom(usableMemory uint64) (uint64, error) {
|
|||||||
headRoom := uint64(usableMemory / 10)
|
headRoom := uint64(usableMemory / 10)
|
||||||
|
|
||||||
// clamp this with 256MB min -- 5GB max
|
// clamp this with 256MB min -- 5GB max
|
||||||
maxHeadRoom := uint64(5 * 1024 * 1024 * 1024)
|
maxHeadRoom := uint64(5 * Mem1GB)
|
||||||
minHeadRoom := uint64(256 * 1024 * 1024)
|
minHeadRoom := uint64(256 * Mem1MB)
|
||||||
|
|
||||||
if minHeadRoom >= usableMemory {
|
if minHeadRoom >= usableMemory {
|
||||||
return 0, fmt.Errorf("Not enough memory: %v", usableMemory)
|
return 0, fmt.Errorf("Not enough memory: %v", usableMemory)
|
||||||
|
|||||||
83
api/agent/resource_test.go
Normal file
83
api/agent/resource_test.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
|
||||||
|
tr.cond.L.Lock()
|
||||||
|
|
||||||
|
tr.ramSyncTotal = vals.st
|
||||||
|
tr.ramSyncUsed = vals.su
|
||||||
|
tr.ramAsyncTotal = vals.at
|
||||||
|
tr.ramAsyncUsed = vals.au
|
||||||
|
tr.ramAsyncHWMark = vals.am
|
||||||
|
|
||||||
|
tr.cond.L.Unlock()
|
||||||
|
tr.cond.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
|
||||||
|
|
||||||
|
tr.cond.L.Lock()
|
||||||
|
|
||||||
|
vals.st = tr.ramSyncTotal
|
||||||
|
vals.su = tr.ramSyncUsed
|
||||||
|
vals.at = tr.ramAsyncTotal
|
||||||
|
vals.au = tr.ramAsyncUsed
|
||||||
|
vals.am = tr.ramAsyncHWMark
|
||||||
|
|
||||||
|
tr.cond.L.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
type trackerVals struct {
|
||||||
|
st uint64
|
||||||
|
su uint64
|
||||||
|
at uint64
|
||||||
|
au uint64
|
||||||
|
am uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResourceAsyncMem(t *testing.T) {
|
||||||
|
|
||||||
|
var vals trackerVals
|
||||||
|
|
||||||
|
trI := NewResourceTracker()
|
||||||
|
|
||||||
|
tr := trI.(*resourceTracker)
|
||||||
|
|
||||||
|
getTrackerTestVals(tr, &vals)
|
||||||
|
if vals.st <= 0 || vals.su != 0 || vals.at <= 0 || vals.au != 0 || vals.am <= 0 {
|
||||||
|
t.Fatalf("faulty init %#v", vals)
|
||||||
|
}
|
||||||
|
|
||||||
|
// set set these to known vals
|
||||||
|
vals.st = 1 * 1024 * 1024
|
||||||
|
vals.su = 0
|
||||||
|
vals.at = 2 * 1024 * 1024
|
||||||
|
vals.au = 0
|
||||||
|
vals.am = 1 * 1024 * 1024
|
||||||
|
|
||||||
|
// should block & wait
|
||||||
|
vals.au = vals.am
|
||||||
|
setTrackerTestVals(tr, &vals)
|
||||||
|
ch := tr.WaitAsyncResource()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
t.Fatal("high water mark over, should not trigger")
|
||||||
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
// should not block & wait
|
||||||
|
vals.au = 0
|
||||||
|
setTrackerTestVals(tr, &vals)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
||||||
|
t.Fatal("high water mark not over, should trigger")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user