From 54ba49be65c274d2ae52c115035211549079348e Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Tue, 24 Apr 2018 21:59:33 -0700 Subject: [PATCH] fn: non-blocking resource tracker and notification (#841) * fn: non-blocking resource tracker and notification For some types of errors, we might want to notify the actual caller if the error is directly 1-1 tied to that request. If hotLauncher is triggered with signaller, then here we send a back communication error notification channel. This is passed to checkLaunch to send back synchronous responses to the caller that initiated this hot container launch. This is useful if we want to run the agent in quick fail mode, where instead of waiting for CPU/Mem to become available, we prefer to fail quick in order not to hold up the caller. To support this, non-blocking resource tracker option/functions are now available. * fn: test env var rename tweak * fn: fixup merge * fn: rebase test fix * fn: merge fixup * fn: test tweak down to 70MB for 128MB total * fn: refactor token creation and use broadcast regardless * fn: nb description * fn: bugfix --- api/agent/agent.go | 58 ++++++++++++++-- api/agent/config.go | 76 +++++++++++---------- api/agent/resource.go | 135 ++++++++++++++++++++++++++----------- api/agent/resource_test.go | 83 ++++++++++++++++++++--- api/agent/slots.go | 4 +- api/server/runner_test.go | 95 ++++++++++++++++++++++++++ 6 files changed, 359 insertions(+), 92 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index 7ae866d09..6bfd50f94 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -315,6 +315,8 @@ func transformTimeout(e error, isRetriable bool) error { return models.ErrCallTimeoutServerBusy } return models.ErrCallTimeout + } else if e == CapacityFull { + return models.ErrCallTimeoutServerBusy } return e } @@ -402,9 +404,12 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) { ctx, span := trace.StartSpan(ctx, "agent_hot_launcher") defer span.End() + var notifyChan chan error + for { ctx, cancel := context.WithTimeout(ctx, timeout) - a.checkLaunch(ctx, call) + a.checkLaunch(ctx, call, notifyChan) + notifyChan = nil select { case <-a.shutWg.Closer(): // server shutdown @@ -416,15 +421,25 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) { logger.Info("Hot function launcher timed out") return } - case <-call.slots.signaller: + case notifyChan = <-call.slots.signaller: cancel() } } } -func (a *agent) checkLaunch(ctx context.Context, call *call) { +func tryNotify(notifyChan chan error, err error) { + if notifyChan != nil && err != nil { + select { + case notifyChan <- err: + default: + } + } +} + +func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan error) { curStats := call.slots.getStats() isAsync := call.Type == models.TypeAsync + isNB := a.cfg.EnableNBResourceTracker isNeeded := isNewContainerNeeded(&curStats) common.Logger(ctx).WithFields(logrus.Fields{"currentStats": curStats, "isNeeded": isNeeded}).Debug("Hot function launcher stats") if !isNeeded { @@ -436,9 +451,28 @@ func (a *agent) checkLaunch(ctx context.Context, call *call) { common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") + // WARNING: Tricky flow below. We are here because: isNeeded is set, + // in other words, we need to launch a new container at this time due to high load. + // + // For non-blocking mode, this means, if we cannot acquire resources (cpu+mem), then we need + // to notify the caller through notifyChan. This is not perfect as the callers and + // checkLaunch do not match 1-1. But this is OK, we can notify *any* waiter that + // has signalled us, this is because non-blocking mode is a system wide setting. + // The notifications are lossy, but callers will signal/poll again if this is the case + // or this may not matter if they've already acquired an empty slot. + // + // For Non-blocking mode, a.cfg.HotPoll should not be set to too high since a missed + // notify event from here will add a.cfg.HotPoll msec latency. Setting a.cfg.HotPoll may + // be an acceptable workaround for the short term since non-blocking mode likely to reduce + // the number of waiters which perhaps could compensate for more frequent polling. + // + // Non-blocking mode only applies to cpu+mem, and if isNeeded decided that we do not + // need to start a new container, then waiters will wait. select { - case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): - if a.shutWg.AddSession(1) { + case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB): + if tok != nil && tok.Error() != nil { + tryNotify(notifyChan, tok.Error()) + } else if a.shutWg.AddSession(1) { go func() { // NOTE: runHot will not inherit the timeout from ctx (ignore timings) a.runHot(ctx, call, tok, state) @@ -466,12 +500,16 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { ch := call.slots.startDequeuer(ctx) + notifyChan := make(chan error) + // 1) if we can get a slot immediately, grab it. // 2) if we don't, send a signaller every x msecs until we do. sleep := 1 * time.Microsecond // pad, so time.After doesn't send immediately for { select { + case err := <-notifyChan: + return nil, err case s := <-ch: if call.slots.acquireSlot(s) { if s.slot.Error() != nil { @@ -493,7 +531,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { sleep = a.cfg.HotPoll // send a notification to launchHot() select { - case call.slots.signaller <- true: + case call.slots.signaller <- notifyChan: default: } } @@ -503,6 +541,8 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { // returns the slot for that new container to run the request on. func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { isAsync := call.Type == models.TypeAsync + isNB := a.cfg.EnableNBResourceTracker + ch := make(chan Slot) ctx, span := trace.StartSpan(ctx, "agent_launch_cold") @@ -511,7 +551,11 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { call.containerState.UpdateState(ctx, ContainerStateWait, call.slots) select { - case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): + case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB): + if tok.Error() != nil { + return nil, tok.Error() + } + go a.prepCold(ctx, call, tok, ch) case <-ctx.Done(): return nil, ctx.Err() diff --git a/api/agent/config.go b/api/agent/config.go index 5b2fabef4..eceadbaeb 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -9,44 +9,46 @@ import ( ) type AgentConfig struct { - MinDockerVersion string `json:"min_docker_version"` - FreezeIdle time.Duration `json:"freeze_idle_msecs"` - EjectIdle time.Duration `json:"eject_idle_msecs"` - HotPoll time.Duration `json:"hot_poll_msecs"` - HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` - AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` - CallEndTimeout time.Duration `json:"call_end_timeout"` - MaxCallEndStacking uint64 `json:"max_call_end_stacking"` - MaxResponseSize uint64 `json:"max_response_size_bytes"` - MaxLogSize uint64 `json:"max_log_size_bytes"` - MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` - MaxTotalMemory uint64 `json:"max_total_memory_bytes"` - MaxFsSize uint64 `json:"max_fs_size_mb"` - PreForkPoolSize uint64 `json:"pre_fork_pool_size"` - PreForkImage string `json:"pre_fork_image"` - PreForkCmd string `json:"pre_fork_pool_cmd"` - PreForkUseOnce uint64 `json:"pre_fork_use_once"` - PreForkNetworks string `json:"pre_fork_networks"` + MinDockerVersion string `json:"min_docker_version"` + FreezeIdle time.Duration `json:"freeze_idle_msecs"` + EjectIdle time.Duration `json:"eject_idle_msecs"` + HotPoll time.Duration `json:"hot_poll_msecs"` + HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` + AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` + CallEndTimeout time.Duration `json:"call_end_timeout"` + MaxCallEndStacking uint64 `json:"max_call_end_stacking"` + MaxResponseSize uint64 `json:"max_response_size_bytes"` + MaxLogSize uint64 `json:"max_log_size_bytes"` + MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` + MaxTotalMemory uint64 `json:"max_total_memory_bytes"` + MaxFsSize uint64 `json:"max_fs_size_mb"` + PreForkPoolSize uint64 `json:"pre_fork_pool_size"` + PreForkImage string `json:"pre_fork_image"` + PreForkCmd string `json:"pre_fork_pool_cmd"` + PreForkUseOnce uint64 `json:"pre_fork_use_once"` + PreForkNetworks string `json:"pre_fork_networks"` + EnableNBResourceTracker bool `json:"enable_nb_resource_tracker"` } const ( - EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" - EnvEjectIdle = "FN_EJECT_IDLE_MSECS" - EnvHotPoll = "FN_HOT_POLL_MSECS" - EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" - EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" - EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" - EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" - EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" - EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" - EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" - EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" - EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" - EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" - EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" - EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" - EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" - EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" + EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" + EnvEjectIdle = "FN_EJECT_IDLE_MSECS" + EnvHotPoll = "FN_HOT_POLL_MSECS" + EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" + EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" + EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" + EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" + EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" + EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" + EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" + EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" + EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" + EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" + EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" + EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" + EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" + EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" + EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER" MaxDisabledMsecs = time.Duration(math.MaxInt64) ) @@ -85,6 +87,10 @@ func NewAgentConfig() (*AgentConfig, error) { return cfg, err } + if _, ok := os.LookupEnv(EnvEnableNBResourceTracker); ok { + cfg.EnableNBResourceTracker = true + } + if cfg.EjectIdle == time.Duration(0) { return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle) } diff --git a/api/agent/resource.go b/api/agent/resource.go index 72da51e45..1a4b608f4 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -23,6 +23,8 @@ const ( Mem1GB = 1024 * 1024 * 1024 ) +var CapacityFull = errors.New("max capacity reached") + // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // TODO: add cpu, disk, network IO for future type ResourceTracker interface { @@ -34,8 +36,10 @@ type ResourceTracker interface { // the channel will never receive anything. If it is not possible to fulfill this resource, the channel // will never receive anything (use IsResourcePossible). If a resource token is available for the provided // resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed. - // Memory is expected to be provided in MB units. - GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync bool) <-chan ResourceToken + // if isNB is set, resource check is done and error token is returned without blocking. + // if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow + // a sync only reserve area) Memory is expected to be provided in MB units. + GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken // IsResourcePossible returns whether it's possible to fulfill the requested resources on this // machine. It must be called before GetResourceToken or GetResourceToken may hang. @@ -88,13 +92,19 @@ func NewResourceTracker(cfg *AgentConfig) ResourceTracker { type ResourceToken interface { // Close must be called by any thread that receives a token. io.Closer + Error() error } type resourceToken struct { once sync.Once + err error decrement func() } +func (t *resourceToken) Error() error { + return t.err +} + func (t *resourceToken) Close() error { t.once.Do(func() { t.decrement() @@ -140,10 +150,93 @@ func (a *resourceTracker) GetResourceTokenWaiterCount() uint64 { return waiters } +func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync bool) ResourceToken { + + var asyncMem, syncMem uint64 + var asyncCPU, syncCPU uint64 + + if isAsync { + // async uses async pool only + asyncMem = memory + asyncCPU = cpuQuota + } else { + // if sync fits async + sync pool + syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory) + syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, cpuQuota) + + asyncMem = memory - syncMem + asyncCPU = cpuQuota - syncCPU + } + + a.ramAsyncUsed += asyncMem + a.ramSyncUsed += syncMem + a.cpuAsyncUsed += asyncCPU + a.cpuSyncUsed += syncCPU + + return &resourceToken{decrement: func() { + + a.cond.L.Lock() + a.ramAsyncUsed -= asyncMem + a.ramSyncUsed -= syncMem + a.cpuAsyncUsed -= asyncCPU + a.cpuSyncUsed -= syncCPU + a.cond.L.Unlock() + + // WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but + // the cost of this spurious wake up is unlikely to impact much performance. Simpler + // to use one cond variable for the time being. + a.cond.Broadcast() + }} +} + +func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota uint64, isAsync bool) ResourceToken { + if !a.IsResourcePossible(memory, cpuQuota, isAsync) { + return &resourceToken{decrement: func() {}, err: CapacityFull} + } + memory = memory * Mem1MB + + var t ResourceToken + + a.cond.L.Lock() + + if !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) { + t = &resourceToken{decrement: func() {}, err: CapacityFull} + } else { + t = a.allocResourcesLocked(memory, cpuQuota, isAsync) + } + + a.cond.L.Unlock() + return t +} + +func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { + ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan") + + ch := make(chan ResourceToken) + go func() { + defer span.End() + t := a.getResourceTokenNB(memory, cpuQuota, isAsync) + + select { + case ch <- t: + case <-ctx.Done(): + // if we can't send b/c nobody is waiting anymore, need to decrement here + t.Close() + } + }() + + return ch +} + // the received token should be passed directly to launch (unconditionally), launch // will close this token (i.e. the receiver should not call Close) -func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { +func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken { + if isNB { + return a.getResourceTokenNBChan(ctx, memory, cpuQuota, isAsync) + } + ch := make(chan ResourceToken) + if !a.IsResourcePossible(memory, cpuQuota, isAsync) { // return the channel, but never send anything. return ch @@ -186,43 +279,9 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c return } - var asyncMem, syncMem uint64 - var asyncCPU, syncCPU uint64 - - if isAsync { - // async uses async pool only - asyncMem = memory - asyncCPU = cpuQuota - } else { - // if sync fits async + sync pool - syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory) - syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, cpuQuota) - - asyncMem = memory - syncMem - asyncCPU = cpuQuota - syncCPU - } - - a.ramAsyncUsed += asyncMem - a.ramSyncUsed += syncMem - a.cpuAsyncUsed += asyncCPU - a.cpuSyncUsed += syncCPU + t := a.allocResourcesLocked(memory, cpuQuota, isAsync) c.L.Unlock() - t := &resourceToken{decrement: func() { - - c.L.Lock() - a.ramAsyncUsed -= asyncMem - a.ramSyncUsed -= syncMem - a.cpuAsyncUsed -= asyncCPU - a.cpuSyncUsed -= syncCPU - c.L.Unlock() - - // WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but - // the cost of this spurious wake up is unlikely to impact much performance. Simpler - // to use one cond variable for the time being. - c.Broadcast() - }} - select { case ch <- t: case <-ctx.Done(): diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go index cb4527e4c..4cd038610 100644 --- a/api/agent/resource_test.go +++ b/api/agent/resource_test.go @@ -179,7 +179,7 @@ func TestResourceGetSimple(t *testing.T) { // ask for 4GB and 10 CPU ctx, cancel := context.WithCancel(context.Background()) - ch := trI.GetResourceToken(ctx, 4*1024, 1000, false) + ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, false) defer cancel() _, err := fetchToken(ch) @@ -198,7 +198,7 @@ func TestResourceGetSimple(t *testing.T) { // ask for another 4GB and 10 CPU ctx, cancel = context.WithCancel(context.Background()) - ch = trI.GetResourceToken(ctx, 4*1024, 1000, false) + ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, false) defer cancel() _, err = fetchToken(ch) @@ -226,6 +226,69 @@ func TestResourceGetSimple(t *testing.T) { } } +func TestResourceGetSimpleNB(t *testing.T) { + + var vals trackerVals + trI := NewResourceTracker(nil) + tr := trI.(*resourceTracker) + + vals.setDefaults() + + // let's make it like CPU and MEM are 100% full + vals.mau = vals.mat + vals.cau = vals.cat + + setTrackerTestVals(tr, &vals) + + // ask for 4GB and 10 CPU + ctx, cancel := context.WithCancel(context.Background()) + ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + defer cancel() + + tok := <-ch + if tok.Error() == nil { + t.Fatalf("full system should not hand out token") + } + + // reset back + vals.setDefaults() + setTrackerTestVals(tr, &vals) + + tok1 := <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + if tok1.Error() != nil { + t.Fatalf("empty system should hand out token") + } + + // ask for another 4GB and 10 CPU + ctx, cancel = context.WithCancel(context.Background()) + ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + defer cancel() + + tok = <-ch + if tok.Error() == nil { + t.Fatalf("full system should not hand out token") + } + + // close means, giant token resources released + tok1.Close() + + tok = <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + if tok.Error() != nil { + t.Fatalf("empty system should hand out token") + } + + tok.Close() + + // POOLS should all be empty now + getTrackerTestVals(tr, &vals) + if vals.msu != 0 || vals.mau != 0 { + t.Fatalf("faulty state MEM %#v", vals) + } + if vals.csu != 0 || vals.cau != 0 { + t.Fatalf("faulty state CPU %#v", vals) + } +} + func TestResourceGetCombo(t *testing.T) { var vals trackerVals @@ -237,7 +300,7 @@ func TestResourceGetCombo(t *testing.T) { // impossible request ctx, cancel := context.WithCancel(context.Background()) - ch := trI.GetResourceToken(ctx, 20*1024, 20000, false) + ch := trI.GetResourceToken(ctx, 20*1024, 20000, false, false) _, err := fetchToken(ch) if err == nil { t.Fatalf("impossible request should never return (error here)") @@ -247,7 +310,7 @@ func TestResourceGetCombo(t *testing.T) { ctx, cancel = context.WithCancel(context.Background()) // let's use up 2 GB of 3GB async pool - ch = trI.GetResourceToken(ctx, 2*1024, 10, true) + ch = trI.GetResourceToken(ctx, 2*1024, 10, true, false) tok1, err := fetchToken(ch) if err != nil { t.Fatalf("empty async system should hand out token1") @@ -257,7 +320,7 @@ func TestResourceGetCombo(t *testing.T) { ctx, cancel = context.WithCancel(context.Background()) // remaining 1 GB async - ch = trI.GetResourceToken(ctx, 1*1024, 11, true) + ch = trI.GetResourceToken(ctx, 1*1024, 11, true, false) tok2, err := fetchToken(ch) if err != nil { t.Fatalf("empty async system should hand out token2") @@ -270,7 +333,7 @@ func TestResourceGetCombo(t *testing.T) { // SYNC POOL HAS 1GB // we no longer can get async token - ch = trI.GetResourceToken(ctx, 1*1024, 12, true) + ch = trI.GetResourceToken(ctx, 1*1024, 12, true, false) _, err = fetchToken(ch) if err == nil { t.Fatalf("full async system should not hand out a token") @@ -280,7 +343,7 @@ func TestResourceGetCombo(t *testing.T) { ctx, cancel = context.WithCancel(context.Background()) // but we should get 1GB sync token - ch = trI.GetResourceToken(ctx, 1*1024, 13, false) + ch = trI.GetResourceToken(ctx, 1*1024, 13, false, false) tok3, err := fetchToken(ch) if err != nil { t.Fatalf("empty sync system should hand out token3") @@ -292,7 +355,7 @@ func TestResourceGetCombo(t *testing.T) { // NOW ASYNC AND SYNC POOLS ARE FULL // this should fail - ch = trI.GetResourceToken(ctx, 1*1024, 14, false) + ch = trI.GetResourceToken(ctx, 1*1024, 14, false, false) _, err = fetchToken(ch) if err == nil { t.Fatalf("full system should not hand out a token") @@ -308,7 +371,7 @@ func TestResourceGetCombo(t *testing.T) { // SYNC POOL IS FULL // async pool should provide this - ch = trI.GetResourceToken(ctx, 1*1024, 15, false) + ch = trI.GetResourceToken(ctx, 1*1024, 15, false, false) tok4, err := fetchToken(ch) if err != nil { t.Fatalf("async system should hand out token4") @@ -326,7 +389,7 @@ func TestResourceGetCombo(t *testing.T) { // SYNC POOL HAS 1GB FREE // now, we ask for 2GB sync token, it should be provided from both async+sync pools - ch = trI.GetResourceToken(ctx, 2*1024, 16, false) + ch = trI.GetResourceToken(ctx, 2*1024, 16, false, false) tok5, err := fetchToken(ch) if err != nil { t.Fatalf("async+sync system should hand out token5") diff --git a/api/agent/slots.go b/api/agent/slots.go index 86d357a12..d2fcf326a 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -50,7 +50,7 @@ type slotQueue struct { cond *sync.Cond slots []*slotToken nextId uint64 - signaller chan bool + signaller chan chan error statsLock sync.Mutex // protects stats below stats slotQueueStats } @@ -67,7 +67,7 @@ func NewSlotQueue(key string) *slotQueue { key: key, cond: sync.NewCond(new(sync.Mutex)), slots: make([]*slotToken, 0), - signaller: make(chan bool, 1), + signaller: make(chan chan error, 1), } return obj diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 84e7a4740..b4579fc13 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -146,6 +146,101 @@ func TestRouteRunnerPost(t *testing.T) { } } +func TestRouteRunnerFastFail(t *testing.T) { + buf := setLogBuffer() + isFailure := false + + tweaker1 := envTweaker("FN_MAX_TOTAL_MEMORY_BYTES", "134217728") // 128MB + tweaker2 := envTweaker("FN_ENABLE_NB_RESOURCE_TRACKER", "yes") // enable fast-fail (no wait on CPU/Mem) + defer tweaker1() + defer tweaker2() + + // Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.) + // for figuring out why things failed. + defer func() { + if isFailure { + t.Log(buf.String()) + } + }() + + rCfg := map[string]string{"ENABLE_HEADER": "yes", "ENABLE_FOOTER": "yes"} // enable container start/end header/footer + rImg := "fnproject/fn-test-utils" + + app := &models.App{Name: "foo"} + app.SetDefaults() + + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Route{ + {Path: "/json", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 70, Timeout: 30, IdleTimeout: 30, Config: rCfg}, + }, + ) + + rnr, cancelrnr := testRunner(t, ds) + defer cancelrnr() + + srv := testServer(ds, &mqs.Mock{}, ds, rnr, ServerTypeFull) + ok := `{"sleepTime": 1000, "isDebug": true}` + + results := make(chan error) + + type tester struct { + path string + body string + method string + expectedCode int + expectedErrSubStr string + } + + for idx, test := range []tester{ + {"/r/foo/json/", ok, "GET", http.StatusOK, ""}, + {"/r/foo/json/", ok, "GET", http.StatusOK, ""}, + {"/r/foo/json/", ok, "GET", http.StatusOK, ""}, + {"/r/foo/json/", ok, "GET", http.StatusOK, ""}, + } { + go func(i int, test tester) { + body := strings.NewReader(test.body) + _, rec := routerRequest(t, srv.Router, test.method, test.path, body) + respBytes, _ := ioutil.ReadAll(rec.Body) + respBody := string(respBytes) + maxBody := len(respBody) + if maxBody > 1024 { + maxBody = 1024 + } + + if rec.Code != test.expectedCode { + results <- fmt.Errorf("Test %d: Expected status code to be %d but was %d. body: %s", + i, test.expectedCode, rec.Code, respBody[:maxBody]) + } else if test.expectedErrSubStr != "" && !strings.Contains(respBody, test.expectedErrSubStr) { + results <- fmt.Errorf("Test %d: Expected response to include %s but got body: %s", + i, test.expectedErrSubStr, respBody[:maxBody]) + } else { + results <- nil + } + + }(idx, test) + } + + totalSuccess := 0 + totalFail := 0 + + // Scan for 4 test results + for i := 0; i < 4; i++ { + err := <-results + if err != nil { + t.Logf("Test %d: received: %s (this is probably OK)", i, err.Error()) + totalFail++ + } else { + totalSuccess++ + } + } + + if totalSuccess != 1 { + t.Errorf("Expected 1 success but got %d (fail: %d)", totalSuccess, totalFail) + isFailure = true + } +} + func TestRouteRunnerIOPipes(t *testing.T) { buf := setLogBuffer() isFailure := false