Files
fn-serverless/api/agent/resource_test.go
Tolga Ceylan 54ba49be65 fn: non-blocking resource tracker and notification (#841)
* fn: non-blocking resource tracker and notification

For some types of errors, we might want to notify
the actual caller if the error is directly 1-1 tied
to that request. If hotLauncher is triggered with
signaller, then here we send a back communication
error notification channel. This is passed to
checkLaunch to send back synchronous responses
to the caller that initiated this hot container
launch.

This is useful if we want to run the agent in
quick fail mode, where instead of waiting for
CPU/Mem to become available, we prefer to fail
quick in order not to hold up the caller.
To support this, non-blocking resource tracker
option/functions are now available.

* fn: test env var rename tweak

* fn: fixup merge

* fn: rebase test fix

* fn: merge fixup

* fn: test tweak down to 70MB for 128MB total

* fn: refactor token creation and use broadcast regardless

* fn: nb description

* fn: bugfix
2018-04-24 21:59:33 -07:00

418 lines
9.0 KiB
Go

package agent
import (
"context"
"errors"
"testing"
"time"
)
func setTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
tr.cond.L.Lock()
tr.ramSyncTotal = vals.mst
tr.ramSyncUsed = vals.msu
tr.ramAsyncTotal = vals.mat
tr.ramAsyncUsed = vals.mau
tr.ramAsyncHWMark = vals.mam
tr.cpuSyncTotal = vals.cst
tr.cpuSyncUsed = vals.csu
tr.cpuAsyncTotal = vals.cat
tr.cpuAsyncUsed = vals.cau
tr.cpuAsyncHWMark = vals.cam
tr.cond.L.Unlock()
tr.cond.Broadcast()
}
func getTrackerTestVals(tr *resourceTracker, vals *trackerVals) {
tr.cond.L.Lock()
vals.mst = tr.ramSyncTotal
vals.msu = tr.ramSyncUsed
vals.mat = tr.ramAsyncTotal
vals.mau = tr.ramAsyncUsed
vals.mam = tr.ramAsyncHWMark
vals.cst = tr.cpuSyncTotal
vals.csu = tr.cpuSyncUsed
vals.cat = tr.cpuAsyncTotal
vals.cau = tr.cpuAsyncUsed
vals.cam = tr.cpuAsyncHWMark
tr.cond.L.Unlock()
}
// helper to debug print (fields correspond to resourceTracker CPU/MEM fields)
type trackerVals struct {
mst uint64
msu uint64
mat uint64
mau uint64
mam uint64
cst uint64
csu uint64
cat uint64
cau uint64
cam uint64
}
func (vals *trackerVals) setDefaults() {
// set set these to known vals (4GB total: 1GB sync, 3 async)
vals.mst = 1 * Mem1GB
vals.msu = 0
vals.mat = 3 * Mem1GB
vals.mau = 0
vals.mam = 1 * Mem1GB
// let's assume 10 CPUs (2 CPU sync, 8 CPU async)
vals.cst = 2000
vals.csu = 0
vals.cat = 8000
vals.cau = 0
vals.cam = 6000
}
func fetchToken(ch <-chan ResourceToken) (ResourceToken, error) {
select {
case tok := <-ch:
return tok, nil
case <-time.After(time.Duration(500) * time.Millisecond):
return nil, errors.New("expected token")
}
}
func isClosed(ch <-chan ResourceToken) bool {
select {
case _, ok := <-ch:
if !ok {
return true
}
default:
}
return false
}
func TestResourceAsyncWait(t *testing.T) {
var vals trackerVals
trI := NewResourceTracker(nil)
tr := trI.(*resourceTracker)
getTrackerTestVals(tr, &vals)
if vals.mst <= 0 || vals.msu != 0 || vals.mat <= 0 || vals.mau != 0 || vals.mam <= 0 {
t.Fatalf("faulty init MEM %#v", vals)
}
if vals.cst <= 0 || vals.csu != 0 || vals.cat <= 0 || vals.cau != 0 || vals.cam <= 0 {
t.Fatalf("faulty init CPU %#v", vals)
}
vals.setDefaults()
// should block & wait
vals.mau = vals.mam
setTrackerTestVals(tr, &vals)
ctx1, cancel1 := context.WithCancel(context.Background())
ch1 := tr.WaitAsyncResource(ctx1)
defer cancel1()
select {
case <-ch1:
t.Fatal("high water mark MEM over, should not trigger")
case <-time.After(time.Duration(500) * time.Millisecond):
}
// should not block & wait
vals.mau = 0
setTrackerTestVals(tr, &vals)
select {
case <-ch1:
case <-time.After(time.Duration(500) * time.Millisecond):
t.Fatal("high water mark MEM not over, should trigger")
}
// get a new channel to prevent previous test interference
ctx2, cancel2 := context.WithCancel(context.Background())
ch2 := tr.WaitAsyncResource(ctx2)
defer cancel2()
// should block & wait
vals.cau = vals.cam
setTrackerTestVals(tr, &vals)
select {
case <-ch2:
t.Fatal("high water mark CPU over, should not trigger")
case <-time.After(time.Duration(500) * time.Millisecond):
}
// should not block & wait
vals.cau = 0
setTrackerTestVals(tr, &vals)
select {
case <-ch2:
case <-time.After(time.Duration(500) * time.Millisecond):
t.Fatal("high water mark CPU not over, should trigger")
}
}
func TestResourceGetSimple(t *testing.T) {
var vals trackerVals
trI := NewResourceTracker(nil)
tr := trI.(*resourceTracker)
vals.setDefaults()
// let's make it like CPU and MEM are 100% full
vals.mau = vals.mat
vals.cau = vals.cat
setTrackerTestVals(tr, &vals)
// ask for 4GB and 10 CPU
ctx, cancel := context.WithCancel(context.Background())
ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, false)
defer cancel()
_, err := fetchToken(ch)
if err == nil {
t.Fatalf("full system should not hand out token")
}
// reset back
vals.setDefaults()
setTrackerTestVals(tr, &vals)
tok, err := fetchToken(ch)
if err != nil {
t.Fatalf("empty system should hand out token")
}
// ask for another 4GB and 10 CPU
ctx, cancel = context.WithCancel(context.Background())
ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, false)
defer cancel()
_, err = fetchToken(ch)
if err == nil {
t.Fatalf("full system should not hand out token")
}
// close means, giant token resources released
tok.Close()
tok, err = fetchToken(ch)
if err != nil {
t.Fatalf("empty system should hand out token")
}
tok.Close()
// POOLS should all be empty now
getTrackerTestVals(tr, &vals)
if vals.msu != 0 || vals.mau != 0 {
t.Fatalf("faulty state MEM %#v", vals)
}
if vals.csu != 0 || vals.cau != 0 {
t.Fatalf("faulty state CPU %#v", vals)
}
}
func TestResourceGetSimpleNB(t *testing.T) {
var vals trackerVals
trI := NewResourceTracker(nil)
tr := trI.(*resourceTracker)
vals.setDefaults()
// let's make it like CPU and MEM are 100% full
vals.mau = vals.mat
vals.cau = vals.cat
setTrackerTestVals(tr, &vals)
// ask for 4GB and 10 CPU
ctx, cancel := context.WithCancel(context.Background())
ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, true)
defer cancel()
tok := <-ch
if tok.Error() == nil {
t.Fatalf("full system should not hand out token")
}
// reset back
vals.setDefaults()
setTrackerTestVals(tr, &vals)
tok1 := <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true)
if tok1.Error() != nil {
t.Fatalf("empty system should hand out token")
}
// ask for another 4GB and 10 CPU
ctx, cancel = context.WithCancel(context.Background())
ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, true)
defer cancel()
tok = <-ch
if tok.Error() == nil {
t.Fatalf("full system should not hand out token")
}
// close means, giant token resources released
tok1.Close()
tok = <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true)
if tok.Error() != nil {
t.Fatalf("empty system should hand out token")
}
tok.Close()
// POOLS should all be empty now
getTrackerTestVals(tr, &vals)
if vals.msu != 0 || vals.mau != 0 {
t.Fatalf("faulty state MEM %#v", vals)
}
if vals.csu != 0 || vals.cau != 0 {
t.Fatalf("faulty state CPU %#v", vals)
}
}
func TestResourceGetCombo(t *testing.T) {
var vals trackerVals
trI := NewResourceTracker(nil)
tr := trI.(*resourceTracker)
vals.setDefaults()
setTrackerTestVals(tr, &vals)
// impossible request
ctx, cancel := context.WithCancel(context.Background())
ch := trI.GetResourceToken(ctx, 20*1024, 20000, false, false)
_, err := fetchToken(ch)
if err == nil {
t.Fatalf("impossible request should never return (error here)")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// let's use up 2 GB of 3GB async pool
ch = trI.GetResourceToken(ctx, 2*1024, 10, true, false)
tok1, err := fetchToken(ch)
if err != nil {
t.Fatalf("empty async system should hand out token1")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// remaining 1 GB async
ch = trI.GetResourceToken(ctx, 1*1024, 11, true, false)
tok2, err := fetchToken(ch)
if err != nil {
t.Fatalf("empty async system should hand out token2")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// NOW ASYNC POOL IS FULL
// SYNC POOL HAS 1GB
// we no longer can get async token
ch = trI.GetResourceToken(ctx, 1*1024, 12, true, false)
_, err = fetchToken(ch)
if err == nil {
t.Fatalf("full async system should not hand out a token")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// but we should get 1GB sync token
ch = trI.GetResourceToken(ctx, 1*1024, 13, false, false)
tok3, err := fetchToken(ch)
if err != nil {
t.Fatalf("empty sync system should hand out token3")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// NOW ASYNC AND SYNC POOLS ARE FULL
// this should fail
ch = trI.GetResourceToken(ctx, 1*1024, 14, false, false)
_, err = fetchToken(ch)
if err == nil {
t.Fatalf("full system should not hand out a token")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// now let's free up some async pool, release tok2 (1GB)
tok2.Close()
// NOW ASYNC POOL HAS 1GB FREE
// SYNC POOL IS FULL
// async pool should provide this
ch = trI.GetResourceToken(ctx, 1*1024, 15, false, false)
tok4, err := fetchToken(ch)
if err != nil {
t.Fatalf("async system should hand out token4")
}
cancel()
ctx, cancel = context.WithCancel(context.Background())
// NOW ASYNC AND SYNC POOLS ARE FULL
tok4.Close()
tok3.Close()
// NOW ASYNC POOL HAS 1GB FREE
// SYNC POOL HAS 1GB FREE
// now, we ask for 2GB sync token, it should be provided from both async+sync pools
ch = trI.GetResourceToken(ctx, 2*1024, 16, false, false)
tok5, err := fetchToken(ch)
if err != nil {
t.Fatalf("async+sync system should hand out token5")
}
cancel()
// NOW ASYNC AND SYNC POOLS ARE FULL
tok1.Close()
tok5.Close()
// attempt to close tok2 twice.. This should be OK.
tok2.Close()
// POOLS should all be empty now
getTrackerTestVals(tr, &vals)
if vals.msu != 0 || vals.mau != 0 {
t.Fatalf("faulty state MEM %#v", vals)
}
if vals.csu != 0 || vals.cau != 0 {
t.Fatalf("faulty state CPU %#v", vals)
}
}