mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* fn: non-blocking resource tracker and notification For some types of errors, we might want to notify the actual caller if the error is directly 1-1 tied to that request. If hotLauncher is triggered with signaller, then here we send a back communication error notification channel. This is passed to checkLaunch to send back synchronous responses to the caller that initiated this hot container launch. This is useful if we want to run the agent in quick fail mode, where instead of waiting for CPU/Mem to become available, we prefer to fail quick in order not to hold up the caller. To support this, non-blocking resource tracker option/functions are now available. * fn: test env var rename tweak * fn: fixup merge * fn: rebase test fix * fn: merge fixup * fn: test tweak down to 70MB for 128MB total * fn: refactor token creation and use broadcast regardless * fn: nb description * fn: bugfix
418 lines
9.0 KiB
Go
418 lines
9.0 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
func setTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
|
|
tr.cond.L.Lock()
|
|
|
|
tr.ramSyncTotal = vals.mst
|
|
tr.ramSyncUsed = vals.msu
|
|
tr.ramAsyncTotal = vals.mat
|
|
tr.ramAsyncUsed = vals.mau
|
|
tr.ramAsyncHWMark = vals.mam
|
|
|
|
tr.cpuSyncTotal = vals.cst
|
|
tr.cpuSyncUsed = vals.csu
|
|
tr.cpuAsyncTotal = vals.cat
|
|
tr.cpuAsyncUsed = vals.cau
|
|
tr.cpuAsyncHWMark = vals.cam
|
|
|
|
tr.cond.L.Unlock()
|
|
tr.cond.Broadcast()
|
|
}
|
|
|
|
func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
|
|
|
|
tr.cond.L.Lock()
|
|
|
|
vals.mst = tr.ramSyncTotal
|
|
vals.msu = tr.ramSyncUsed
|
|
vals.mat = tr.ramAsyncTotal
|
|
vals.mau = tr.ramAsyncUsed
|
|
vals.mam = tr.ramAsyncHWMark
|
|
|
|
vals.cst = tr.cpuSyncTotal
|
|
vals.csu = tr.cpuSyncUsed
|
|
vals.cat = tr.cpuAsyncTotal
|
|
vals.cau = tr.cpuAsyncUsed
|
|
vals.cam = tr.cpuAsyncHWMark
|
|
|
|
tr.cond.L.Unlock()
|
|
}
|
|
|
|
// helper to debug print (fields correspond to resourceTracker CPU/MEM fields)
|
|
type trackerVals struct {
|
|
mst uint64
|
|
msu uint64
|
|
mat uint64
|
|
mau uint64
|
|
mam uint64
|
|
cst uint64
|
|
csu uint64
|
|
cat uint64
|
|
cau uint64
|
|
cam uint64
|
|
}
|
|
|
|
func (vals *trackerVals) setDefaults() {
|
|
// set set these to known vals (4GB total: 1GB sync, 3 async)
|
|
vals.mst = 1 * Mem1GB
|
|
vals.msu = 0
|
|
vals.mat = 3 * Mem1GB
|
|
vals.mau = 0
|
|
vals.mam = 1 * Mem1GB
|
|
|
|
// let's assume 10 CPUs (2 CPU sync, 8 CPU async)
|
|
vals.cst = 2000
|
|
vals.csu = 0
|
|
vals.cat = 8000
|
|
vals.cau = 0
|
|
vals.cam = 6000
|
|
}
|
|
|
|
func fetchToken(ch <-chan ResourceToken) (ResourceToken, error) {
|
|
select {
|
|
case tok := <-ch:
|
|
return tok, nil
|
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
|
return nil, errors.New("expected token")
|
|
}
|
|
}
|
|
|
|
func isClosed(ch <-chan ResourceToken) bool {
|
|
select {
|
|
case _, ok := <-ch:
|
|
if !ok {
|
|
return true
|
|
}
|
|
default:
|
|
}
|
|
return false
|
|
}
|
|
|
|
func TestResourceAsyncWait(t *testing.T) {
|
|
|
|
var vals trackerVals
|
|
|
|
trI := NewResourceTracker(nil)
|
|
|
|
tr := trI.(*resourceTracker)
|
|
|
|
getTrackerTestVals(tr, &vals)
|
|
if vals.mst <= 0 || vals.msu != 0 || vals.mat <= 0 || vals.mau != 0 || vals.mam <= 0 {
|
|
t.Fatalf("faulty init MEM %#v", vals)
|
|
}
|
|
if vals.cst <= 0 || vals.csu != 0 || vals.cat <= 0 || vals.cau != 0 || vals.cam <= 0 {
|
|
t.Fatalf("faulty init CPU %#v", vals)
|
|
}
|
|
|
|
vals.setDefaults()
|
|
|
|
// should block & wait
|
|
vals.mau = vals.mam
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
ctx1, cancel1 := context.WithCancel(context.Background())
|
|
ch1 := tr.WaitAsyncResource(ctx1)
|
|
defer cancel1()
|
|
|
|
select {
|
|
case <-ch1:
|
|
t.Fatal("high water mark MEM over, should not trigger")
|
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
|
}
|
|
|
|
// should not block & wait
|
|
vals.mau = 0
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
select {
|
|
case <-ch1:
|
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
|
t.Fatal("high water mark MEM not over, should trigger")
|
|
}
|
|
|
|
// get a new channel to prevent previous test interference
|
|
ctx2, cancel2 := context.WithCancel(context.Background())
|
|
ch2 := tr.WaitAsyncResource(ctx2)
|
|
defer cancel2()
|
|
|
|
// should block & wait
|
|
vals.cau = vals.cam
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
select {
|
|
case <-ch2:
|
|
t.Fatal("high water mark CPU over, should not trigger")
|
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
|
}
|
|
|
|
// should not block & wait
|
|
vals.cau = 0
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
select {
|
|
case <-ch2:
|
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
|
t.Fatal("high water mark CPU not over, should trigger")
|
|
}
|
|
}
|
|
|
|
func TestResourceGetSimple(t *testing.T) {
|
|
|
|
var vals trackerVals
|
|
trI := NewResourceTracker(nil)
|
|
tr := trI.(*resourceTracker)
|
|
|
|
vals.setDefaults()
|
|
|
|
// let's make it like CPU and MEM are 100% full
|
|
vals.mau = vals.mat
|
|
vals.cau = vals.cat
|
|
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
// ask for 4GB and 10 CPU
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, false)
|
|
defer cancel()
|
|
|
|
_, err := fetchToken(ch)
|
|
if err == nil {
|
|
t.Fatalf("full system should not hand out token")
|
|
}
|
|
|
|
// reset back
|
|
vals.setDefaults()
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
tok, err := fetchToken(ch)
|
|
if err != nil {
|
|
t.Fatalf("empty system should hand out token")
|
|
}
|
|
|
|
// ask for another 4GB and 10 CPU
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, false)
|
|
defer cancel()
|
|
|
|
_, err = fetchToken(ch)
|
|
if err == nil {
|
|
t.Fatalf("full system should not hand out token")
|
|
}
|
|
|
|
// close means, giant token resources released
|
|
tok.Close()
|
|
|
|
tok, err = fetchToken(ch)
|
|
if err != nil {
|
|
t.Fatalf("empty system should hand out token")
|
|
}
|
|
|
|
tok.Close()
|
|
|
|
// POOLS should all be empty now
|
|
getTrackerTestVals(tr, &vals)
|
|
if vals.msu != 0 || vals.mau != 0 {
|
|
t.Fatalf("faulty state MEM %#v", vals)
|
|
}
|
|
if vals.csu != 0 || vals.cau != 0 {
|
|
t.Fatalf("faulty state CPU %#v", vals)
|
|
}
|
|
}
|
|
|
|
func TestResourceGetSimpleNB(t *testing.T) {
|
|
|
|
var vals trackerVals
|
|
trI := NewResourceTracker(nil)
|
|
tr := trI.(*resourceTracker)
|
|
|
|
vals.setDefaults()
|
|
|
|
// let's make it like CPU and MEM are 100% full
|
|
vals.mau = vals.mat
|
|
vals.cau = vals.cat
|
|
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
// ask for 4GB and 10 CPU
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, true)
|
|
defer cancel()
|
|
|
|
tok := <-ch
|
|
if tok.Error() == nil {
|
|
t.Fatalf("full system should not hand out token")
|
|
}
|
|
|
|
// reset back
|
|
vals.setDefaults()
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
tok1 := <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true)
|
|
if tok1.Error() != nil {
|
|
t.Fatalf("empty system should hand out token")
|
|
}
|
|
|
|
// ask for another 4GB and 10 CPU
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, true)
|
|
defer cancel()
|
|
|
|
tok = <-ch
|
|
if tok.Error() == nil {
|
|
t.Fatalf("full system should not hand out token")
|
|
}
|
|
|
|
// close means, giant token resources released
|
|
tok1.Close()
|
|
|
|
tok = <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true)
|
|
if tok.Error() != nil {
|
|
t.Fatalf("empty system should hand out token")
|
|
}
|
|
|
|
tok.Close()
|
|
|
|
// POOLS should all be empty now
|
|
getTrackerTestVals(tr, &vals)
|
|
if vals.msu != 0 || vals.mau != 0 {
|
|
t.Fatalf("faulty state MEM %#v", vals)
|
|
}
|
|
if vals.csu != 0 || vals.cau != 0 {
|
|
t.Fatalf("faulty state CPU %#v", vals)
|
|
}
|
|
}
|
|
|
|
func TestResourceGetCombo(t *testing.T) {
|
|
|
|
var vals trackerVals
|
|
trI := NewResourceTracker(nil)
|
|
tr := trI.(*resourceTracker)
|
|
|
|
vals.setDefaults()
|
|
setTrackerTestVals(tr, &vals)
|
|
|
|
// impossible request
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ch := trI.GetResourceToken(ctx, 20*1024, 20000, false, false)
|
|
_, err := fetchToken(ch)
|
|
if err == nil {
|
|
t.Fatalf("impossible request should never return (error here)")
|
|
}
|
|
|
|
cancel()
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
|
|
// let's use up 2 GB of 3GB async pool
|
|
ch = trI.GetResourceToken(ctx, 2*1024, 10, true, false)
|
|
tok1, err := fetchToken(ch)
|
|
if err != nil {
|
|
t.Fatalf("empty async system should hand out token1")
|
|
}
|
|
|
|
cancel()
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
|
|
// remaining 1 GB async
|
|
ch = trI.GetResourceToken(ctx, 1*1024, 11, true, false)
|
|
tok2, err := fetchToken(ch)
|
|
if err != nil {
|
|
t.Fatalf("empty async system should hand out token2")
|
|
}
|
|
|
|
cancel()
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
|
|
// NOW ASYNC POOL IS FULL
|
|
// SYNC POOL HAS 1GB
|
|
|
|
// we no longer can get async token
|
|
ch = trI.GetResourceToken(ctx, 1*1024, 12, true, false)
|
|
_, err = fetchToken(ch)
|
|
if err == nil {
|
|
t.Fatalf("full async system should not hand out a token")
|
|
}
|
|
|
|
cancel()
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
|
|
// but we should get 1GB sync token
|
|
ch = trI.GetResourceToken(ctx, 1*1024, 13, false, false)
|
|
tok3, err := fetchToken(ch)
|
|
if err != nil {
|
|
t.Fatalf("empty sync system should hand out token3")
|
|
}
|
|
|
|
cancel()
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
|
|
// NOW ASYNC AND SYNC POOLS ARE FULL
|
|
|
|
// this should fail
|
|
ch = trI.GetResourceToken(ctx, 1*1024, 14, false, false)
|
|
_, err = fetchToken(ch)
|
|
if err == nil {
|
|
t.Fatalf("full system should not hand out a token")
|
|
}
|
|
|
|
cancel()
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
|
|
// now let's free up some async pool, release tok2 (1GB)
|
|
tok2.Close()
|
|
|
|
// NOW ASYNC POOL HAS 1GB FREE
|
|
// SYNC POOL IS FULL
|
|
|
|
// async pool should provide this
|
|
ch = trI.GetResourceToken(ctx, 1*1024, 15, false, false)
|
|
tok4, err := fetchToken(ch)
|
|
if err != nil {
|
|
t.Fatalf("async system should hand out token4")
|
|
}
|
|
|
|
cancel()
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
|
|
// NOW ASYNC AND SYNC POOLS ARE FULL
|
|
|
|
tok4.Close()
|
|
tok3.Close()
|
|
|
|
// NOW ASYNC POOL HAS 1GB FREE
|
|
// SYNC POOL HAS 1GB FREE
|
|
|
|
// now, we ask for 2GB sync token, it should be provided from both async+sync pools
|
|
ch = trI.GetResourceToken(ctx, 2*1024, 16, false, false)
|
|
tok5, err := fetchToken(ch)
|
|
if err != nil {
|
|
t.Fatalf("async+sync system should hand out token5")
|
|
}
|
|
|
|
cancel()
|
|
|
|
// NOW ASYNC AND SYNC POOLS ARE FULL
|
|
|
|
tok1.Close()
|
|
tok5.Close()
|
|
|
|
// attempt to close tok2 twice.. This should be OK.
|
|
tok2.Close()
|
|
|
|
// POOLS should all be empty now
|
|
getTrackerTestVals(tr, &vals)
|
|
if vals.msu != 0 || vals.mau != 0 {
|
|
t.Fatalf("faulty state MEM %#v", vals)
|
|
}
|
|
if vals.csu != 0 || vals.cau != 0 {
|
|
t.Fatalf("faulty state CPU %#v", vals)
|
|
}
|
|
|
|
}
|