diff --git a/api/agent/agent.go b/api/agent/agent.go index 7ae866d09..6bfd50f94 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -315,6 +315,8 @@ func transformTimeout(e error, isRetriable bool) error { return models.ErrCallTimeoutServerBusy } return models.ErrCallTimeout + } else if e == CapacityFull { + return models.ErrCallTimeoutServerBusy } return e } @@ -402,9 +404,12 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) { ctx, span := trace.StartSpan(ctx, "agent_hot_launcher") defer span.End() + var notifyChan chan error + for { ctx, cancel := context.WithTimeout(ctx, timeout) - a.checkLaunch(ctx, call) + a.checkLaunch(ctx, call, notifyChan) + notifyChan = nil select { case <-a.shutWg.Closer(): // server shutdown @@ -416,15 +421,25 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) { logger.Info("Hot function launcher timed out") return } - case <-call.slots.signaller: + case notifyChan = <-call.slots.signaller: cancel() } } } -func (a *agent) checkLaunch(ctx context.Context, call *call) { +func tryNotify(notifyChan chan error, err error) { + if notifyChan != nil && err != nil { + select { + case notifyChan <- err: + default: + } + } +} + +func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan error) { curStats := call.slots.getStats() isAsync := call.Type == models.TypeAsync + isNB := a.cfg.EnableNBResourceTracker isNeeded := isNewContainerNeeded(&curStats) common.Logger(ctx).WithFields(logrus.Fields{"currentStats": curStats, "isNeeded": isNeeded}).Debug("Hot function launcher stats") if !isNeeded { @@ -436,9 +451,28 @@ func (a *agent) checkLaunch(ctx context.Context, call *call) { common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") + // WARNING: Tricky flow below. We are here because: isNeeded is set, + // in other words, we need to launch a new container at this time due to high load. + // + // For non-blocking mode, this means, if we cannot acquire resources (cpu+mem), then we need + // to notify the caller through notifyChan. This is not perfect as the callers and + // checkLaunch do not match 1-1. But this is OK, we can notify *any* waiter that + // has signalled us, this is because non-blocking mode is a system wide setting. + // The notifications are lossy, but callers will signal/poll again if this is the case + // or this may not matter if they've already acquired an empty slot. + // + // For Non-blocking mode, a.cfg.HotPoll should not be set to too high since a missed + // notify event from here will add a.cfg.HotPoll msec latency. Setting a.cfg.HotPoll may + // be an acceptable workaround for the short term since non-blocking mode likely to reduce + // the number of waiters which perhaps could compensate for more frequent polling. + // + // Non-blocking mode only applies to cpu+mem, and if isNeeded decided that we do not + // need to start a new container, then waiters will wait. select { - case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): - if a.shutWg.AddSession(1) { + case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB): + if tok != nil && tok.Error() != nil { + tryNotify(notifyChan, tok.Error()) + } else if a.shutWg.AddSession(1) { go func() { // NOTE: runHot will not inherit the timeout from ctx (ignore timings) a.runHot(ctx, call, tok, state) @@ -466,12 +500,16 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { ch := call.slots.startDequeuer(ctx) + notifyChan := make(chan error) + // 1) if we can get a slot immediately, grab it. // 2) if we don't, send a signaller every x msecs until we do. sleep := 1 * time.Microsecond // pad, so time.After doesn't send immediately for { select { + case err := <-notifyChan: + return nil, err case s := <-ch: if call.slots.acquireSlot(s) { if s.slot.Error() != nil { @@ -493,7 +531,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { sleep = a.cfg.HotPoll // send a notification to launchHot() select { - case call.slots.signaller <- true: + case call.slots.signaller <- notifyChan: default: } } @@ -503,6 +541,8 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { // returns the slot for that new container to run the request on. func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { isAsync := call.Type == models.TypeAsync + isNB := a.cfg.EnableNBResourceTracker + ch := make(chan Slot) ctx, span := trace.StartSpan(ctx, "agent_launch_cold") @@ -511,7 +551,11 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { call.containerState.UpdateState(ctx, ContainerStateWait, call.slots) select { - case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync): + case tok := <-a.resources.GetResourceToken(ctx, call.Memory, uint64(call.CPUs), isAsync, isNB): + if tok.Error() != nil { + return nil, tok.Error() + } + go a.prepCold(ctx, call, tok, ch) case <-ctx.Done(): return nil, ctx.Err() diff --git a/api/agent/config.go b/api/agent/config.go index 5b2fabef4..eceadbaeb 100644 --- a/api/agent/config.go +++ b/api/agent/config.go @@ -9,44 +9,46 @@ import ( ) type AgentConfig struct { - MinDockerVersion string `json:"min_docker_version"` - FreezeIdle time.Duration `json:"freeze_idle_msecs"` - EjectIdle time.Duration `json:"eject_idle_msecs"` - HotPoll time.Duration `json:"hot_poll_msecs"` - HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` - AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` - CallEndTimeout time.Duration `json:"call_end_timeout"` - MaxCallEndStacking uint64 `json:"max_call_end_stacking"` - MaxResponseSize uint64 `json:"max_response_size_bytes"` - MaxLogSize uint64 `json:"max_log_size_bytes"` - MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` - MaxTotalMemory uint64 `json:"max_total_memory_bytes"` - MaxFsSize uint64 `json:"max_fs_size_mb"` - PreForkPoolSize uint64 `json:"pre_fork_pool_size"` - PreForkImage string `json:"pre_fork_image"` - PreForkCmd string `json:"pre_fork_pool_cmd"` - PreForkUseOnce uint64 `json:"pre_fork_use_once"` - PreForkNetworks string `json:"pre_fork_networks"` + MinDockerVersion string `json:"min_docker_version"` + FreezeIdle time.Duration `json:"freeze_idle_msecs"` + EjectIdle time.Duration `json:"eject_idle_msecs"` + HotPoll time.Duration `json:"hot_poll_msecs"` + HotLauncherTimeout time.Duration `json:"hot_launcher_timeout_msecs"` + AsyncChewPoll time.Duration `json:"async_chew_poll_msecs"` + CallEndTimeout time.Duration `json:"call_end_timeout"` + MaxCallEndStacking uint64 `json:"max_call_end_stacking"` + MaxResponseSize uint64 `json:"max_response_size_bytes"` + MaxLogSize uint64 `json:"max_log_size_bytes"` + MaxTotalCPU uint64 `json:"max_total_cpu_mcpus"` + MaxTotalMemory uint64 `json:"max_total_memory_bytes"` + MaxFsSize uint64 `json:"max_fs_size_mb"` + PreForkPoolSize uint64 `json:"pre_fork_pool_size"` + PreForkImage string `json:"pre_fork_image"` + PreForkCmd string `json:"pre_fork_pool_cmd"` + PreForkUseOnce uint64 `json:"pre_fork_use_once"` + PreForkNetworks string `json:"pre_fork_networks"` + EnableNBResourceTracker bool `json:"enable_nb_resource_tracker"` } const ( - EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" - EnvEjectIdle = "FN_EJECT_IDLE_MSECS" - EnvHotPoll = "FN_HOT_POLL_MSECS" - EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" - EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" - EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" - EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" - EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" - EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" - EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" - EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" - EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" - EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" - EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" - EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" - EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" - EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" + EnvFreezeIdle = "FN_FREEZE_IDLE_MSECS" + EnvEjectIdle = "FN_EJECT_IDLE_MSECS" + EnvHotPoll = "FN_HOT_POLL_MSECS" + EnvHotLauncherTimeout = "FN_HOT_LAUNCHER_TIMEOUT_MSECS" + EnvAsyncChewPoll = "FN_ASYNC_CHEW_POLL_MSECS" + EnvCallEndTimeout = "FN_CALL_END_TIMEOUT_MSECS" + EnvMaxCallEndStacking = "FN_MAX_CALL_END_STACKING" + EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE" + EnvMaxLogSize = "FN_MAX_LOG_SIZE_BYTES" + EnvMaxTotalCPU = "FN_MAX_TOTAL_CPU_MCPUS" + EnvMaxTotalMemory = "FN_MAX_TOTAL_MEMORY_BYTES" + EnvMaxFsSize = "FN_MAX_FS_SIZE_MB" + EnvPreForkPoolSize = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE" + EnvPreForkImage = "FN_EXPERIMENTAL_PREFORK_IMAGE" + EnvPreForkCmd = "FN_EXPERIMENTAL_PREFORK_CMD" + EnvPreForkUseOnce = "FN_EXPERIMENTAL_PREFORK_USE_ONCE" + EnvPreForkNetworks = "FN_EXPERIMENTAL_PREFORK_NETWORKS" + EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER" MaxDisabledMsecs = time.Duration(math.MaxInt64) ) @@ -85,6 +87,10 @@ func NewAgentConfig() (*AgentConfig, error) { return cfg, err } + if _, ok := os.LookupEnv(EnvEnableNBResourceTracker); ok { + cfg.EnableNBResourceTracker = true + } + if cfg.EjectIdle == time.Duration(0) { return cfg, fmt.Errorf("error %s cannot be zero", EnvEjectIdle) } diff --git a/api/agent/resource.go b/api/agent/resource.go index 72da51e45..1a4b608f4 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -23,6 +23,8 @@ const ( Mem1GB = 1024 * 1024 * 1024 ) +var CapacityFull = errors.New("max capacity reached") + // A simple resource (memory, cpu, disk, etc.) tracker for scheduling. // TODO: add cpu, disk, network IO for future type ResourceTracker interface { @@ -34,8 +36,10 @@ type ResourceTracker interface { // the channel will never receive anything. If it is not possible to fulfill this resource, the channel // will never receive anything (use IsResourcePossible). If a resource token is available for the provided // resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed. - // Memory is expected to be provided in MB units. - GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync bool) <-chan ResourceToken + // if isNB is set, resource check is done and error token is returned without blocking. + // if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow + // a sync only reserve area) Memory is expected to be provided in MB units. + GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken // IsResourcePossible returns whether it's possible to fulfill the requested resources on this // machine. It must be called before GetResourceToken or GetResourceToken may hang. @@ -88,13 +92,19 @@ func NewResourceTracker(cfg *AgentConfig) ResourceTracker { type ResourceToken interface { // Close must be called by any thread that receives a token. io.Closer + Error() error } type resourceToken struct { once sync.Once + err error decrement func() } +func (t *resourceToken) Error() error { + return t.err +} + func (t *resourceToken) Close() error { t.once.Do(func() { t.decrement() @@ -140,10 +150,93 @@ func (a *resourceTracker) GetResourceTokenWaiterCount() uint64 { return waiters } +func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync bool) ResourceToken { + + var asyncMem, syncMem uint64 + var asyncCPU, syncCPU uint64 + + if isAsync { + // async uses async pool only + asyncMem = memory + asyncCPU = cpuQuota + } else { + // if sync fits async + sync pool + syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory) + syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, cpuQuota) + + asyncMem = memory - syncMem + asyncCPU = cpuQuota - syncCPU + } + + a.ramAsyncUsed += asyncMem + a.ramSyncUsed += syncMem + a.cpuAsyncUsed += asyncCPU + a.cpuSyncUsed += syncCPU + + return &resourceToken{decrement: func() { + + a.cond.L.Lock() + a.ramAsyncUsed -= asyncMem + a.ramSyncUsed -= syncMem + a.cpuAsyncUsed -= asyncCPU + a.cpuSyncUsed -= syncCPU + a.cond.L.Unlock() + + // WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but + // the cost of this spurious wake up is unlikely to impact much performance. Simpler + // to use one cond variable for the time being. + a.cond.Broadcast() + }} +} + +func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota uint64, isAsync bool) ResourceToken { + if !a.IsResourcePossible(memory, cpuQuota, isAsync) { + return &resourceToken{decrement: func() {}, err: CapacityFull} + } + memory = memory * Mem1MB + + var t ResourceToken + + a.cond.L.Lock() + + if !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) { + t = &resourceToken{decrement: func() {}, err: CapacityFull} + } else { + t = a.allocResourcesLocked(memory, cpuQuota, isAsync) + } + + a.cond.L.Unlock() + return t +} + +func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { + ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan") + + ch := make(chan ResourceToken) + go func() { + defer span.End() + t := a.getResourceTokenNB(memory, cpuQuota, isAsync) + + select { + case ch <- t: + case <-ctx.Done(): + // if we can't send b/c nobody is waiting anymore, need to decrement here + t.Close() + } + }() + + return ch +} + // the received token should be passed directly to launch (unconditionally), launch // will close this token (i.e. the receiver should not call Close) -func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync bool) <-chan ResourceToken { +func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken { + if isNB { + return a.getResourceTokenNBChan(ctx, memory, cpuQuota, isAsync) + } + ch := make(chan ResourceToken) + if !a.IsResourcePossible(memory, cpuQuota, isAsync) { // return the channel, but never send anything. return ch @@ -186,43 +279,9 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c return } - var asyncMem, syncMem uint64 - var asyncCPU, syncCPU uint64 - - if isAsync { - // async uses async pool only - asyncMem = memory - asyncCPU = cpuQuota - } else { - // if sync fits async + sync pool - syncMem = minUint64(a.ramSyncTotal-a.ramSyncUsed, memory) - syncCPU = minUint64(a.cpuSyncTotal-a.cpuSyncUsed, cpuQuota) - - asyncMem = memory - syncMem - asyncCPU = cpuQuota - syncCPU - } - - a.ramAsyncUsed += asyncMem - a.ramSyncUsed += syncMem - a.cpuAsyncUsed += asyncCPU - a.cpuSyncUsed += syncCPU + t := a.allocResourcesLocked(memory, cpuQuota, isAsync) c.L.Unlock() - t := &resourceToken{decrement: func() { - - c.L.Lock() - a.ramAsyncUsed -= asyncMem - a.ramSyncUsed -= syncMem - a.cpuAsyncUsed -= asyncCPU - a.cpuSyncUsed -= syncCPU - c.L.Unlock() - - // WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but - // the cost of this spurious wake up is unlikely to impact much performance. Simpler - // to use one cond variable for the time being. - c.Broadcast() - }} - select { case ch <- t: case <-ctx.Done(): diff --git a/api/agent/resource_test.go b/api/agent/resource_test.go index cb4527e4c..4cd038610 100644 --- a/api/agent/resource_test.go +++ b/api/agent/resource_test.go @@ -179,7 +179,7 @@ func TestResourceGetSimple(t *testing.T) { // ask for 4GB and 10 CPU ctx, cancel := context.WithCancel(context.Background()) - ch := trI.GetResourceToken(ctx, 4*1024, 1000, false) + ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, false) defer cancel() _, err := fetchToken(ch) @@ -198,7 +198,7 @@ func TestResourceGetSimple(t *testing.T) { // ask for another 4GB and 10 CPU ctx, cancel = context.WithCancel(context.Background()) - ch = trI.GetResourceToken(ctx, 4*1024, 1000, false) + ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, false) defer cancel() _, err = fetchToken(ch) @@ -226,6 +226,69 @@ func TestResourceGetSimple(t *testing.T) { } } +func TestResourceGetSimpleNB(t *testing.T) { + + var vals trackerVals + trI := NewResourceTracker(nil) + tr := trI.(*resourceTracker) + + vals.setDefaults() + + // let's make it like CPU and MEM are 100% full + vals.mau = vals.mat + vals.cau = vals.cat + + setTrackerTestVals(tr, &vals) + + // ask for 4GB and 10 CPU + ctx, cancel := context.WithCancel(context.Background()) + ch := trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + defer cancel() + + tok := <-ch + if tok.Error() == nil { + t.Fatalf("full system should not hand out token") + } + + // reset back + vals.setDefaults() + setTrackerTestVals(tr, &vals) + + tok1 := <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + if tok1.Error() != nil { + t.Fatalf("empty system should hand out token") + } + + // ask for another 4GB and 10 CPU + ctx, cancel = context.WithCancel(context.Background()) + ch = trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + defer cancel() + + tok = <-ch + if tok.Error() == nil { + t.Fatalf("full system should not hand out token") + } + + // close means, giant token resources released + tok1.Close() + + tok = <-trI.GetResourceToken(ctx, 4*1024, 1000, false, true) + if tok.Error() != nil { + t.Fatalf("empty system should hand out token") + } + + tok.Close() + + // POOLS should all be empty now + getTrackerTestVals(tr, &vals) + if vals.msu != 0 || vals.mau != 0 { + t.Fatalf("faulty state MEM %#v", vals) + } + if vals.csu != 0 || vals.cau != 0 { + t.Fatalf("faulty state CPU %#v", vals) + } +} + func TestResourceGetCombo(t *testing.T) { var vals trackerVals @@ -237,7 +300,7 @@ func TestResourceGetCombo(t *testing.T) { // impossible request ctx, cancel := context.WithCancel(context.Background()) - ch := trI.GetResourceToken(ctx, 20*1024, 20000, false) + ch := trI.GetResourceToken(ctx, 20*1024, 20000, false, false) _, err := fetchToken(ch) if err == nil { t.Fatalf("impossible request should never return (error here)") @@ -247,7 +310,7 @@ func TestResourceGetCombo(t *testing.T) { ctx, cancel = context.WithCancel(context.Background()) // let's use up 2 GB of 3GB async pool - ch = trI.GetResourceToken(ctx, 2*1024, 10, true) + ch = trI.GetResourceToken(ctx, 2*1024, 10, true, false) tok1, err := fetchToken(ch) if err != nil { t.Fatalf("empty async system should hand out token1") @@ -257,7 +320,7 @@ func TestResourceGetCombo(t *testing.T) { ctx, cancel = context.WithCancel(context.Background()) // remaining 1 GB async - ch = trI.GetResourceToken(ctx, 1*1024, 11, true) + ch = trI.GetResourceToken(ctx, 1*1024, 11, true, false) tok2, err := fetchToken(ch) if err != nil { t.Fatalf("empty async system should hand out token2") @@ -270,7 +333,7 @@ func TestResourceGetCombo(t *testing.T) { // SYNC POOL HAS 1GB // we no longer can get async token - ch = trI.GetResourceToken(ctx, 1*1024, 12, true) + ch = trI.GetResourceToken(ctx, 1*1024, 12, true, false) _, err = fetchToken(ch) if err == nil { t.Fatalf("full async system should not hand out a token") @@ -280,7 +343,7 @@ func TestResourceGetCombo(t *testing.T) { ctx, cancel = context.WithCancel(context.Background()) // but we should get 1GB sync token - ch = trI.GetResourceToken(ctx, 1*1024, 13, false) + ch = trI.GetResourceToken(ctx, 1*1024, 13, false, false) tok3, err := fetchToken(ch) if err != nil { t.Fatalf("empty sync system should hand out token3") @@ -292,7 +355,7 @@ func TestResourceGetCombo(t *testing.T) { // NOW ASYNC AND SYNC POOLS ARE FULL // this should fail - ch = trI.GetResourceToken(ctx, 1*1024, 14, false) + ch = trI.GetResourceToken(ctx, 1*1024, 14, false, false) _, err = fetchToken(ch) if err == nil { t.Fatalf("full system should not hand out a token") @@ -308,7 +371,7 @@ func TestResourceGetCombo(t *testing.T) { // SYNC POOL IS FULL // async pool should provide this - ch = trI.GetResourceToken(ctx, 1*1024, 15, false) + ch = trI.GetResourceToken(ctx, 1*1024, 15, false, false) tok4, err := fetchToken(ch) if err != nil { t.Fatalf("async system should hand out token4") @@ -326,7 +389,7 @@ func TestResourceGetCombo(t *testing.T) { // SYNC POOL HAS 1GB FREE // now, we ask for 2GB sync token, it should be provided from both async+sync pools - ch = trI.GetResourceToken(ctx, 2*1024, 16, false) + ch = trI.GetResourceToken(ctx, 2*1024, 16, false, false) tok5, err := fetchToken(ch) if err != nil { t.Fatalf("async+sync system should hand out token5") diff --git a/api/agent/slots.go b/api/agent/slots.go index 86d357a12..d2fcf326a 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -50,7 +50,7 @@ type slotQueue struct { cond *sync.Cond slots []*slotToken nextId uint64 - signaller chan bool + signaller chan chan error statsLock sync.Mutex // protects stats below stats slotQueueStats } @@ -67,7 +67,7 @@ func NewSlotQueue(key string) *slotQueue { key: key, cond: sync.NewCond(new(sync.Mutex)), slots: make([]*slotToken, 0), - signaller: make(chan bool, 1), + signaller: make(chan chan error, 1), } return obj diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 84e7a4740..b4579fc13 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -146,6 +146,101 @@ func TestRouteRunnerPost(t *testing.T) { } } +func TestRouteRunnerFastFail(t *testing.T) { + buf := setLogBuffer() + isFailure := false + + tweaker1 := envTweaker("FN_MAX_TOTAL_MEMORY_BYTES", "134217728") // 128MB + tweaker2 := envTweaker("FN_ENABLE_NB_RESOURCE_TRACKER", "yes") // enable fast-fail (no wait on CPU/Mem) + defer tweaker1() + defer tweaker2() + + // Log once after we are done, flow of events are important (hot/cold containers, idle timeout, etc.) + // for figuring out why things failed. + defer func() { + if isFailure { + t.Log(buf.String()) + } + }() + + rCfg := map[string]string{"ENABLE_HEADER": "yes", "ENABLE_FOOTER": "yes"} // enable container start/end header/footer + rImg := "fnproject/fn-test-utils" + + app := &models.App{Name: "foo"} + app.SetDefaults() + + ds := datastore.NewMockInit( + []*models.App{app}, + []*models.Route{ + {Path: "/json", AppID: app.ID, Image: rImg, Type: "sync", Format: "json", Memory: 70, Timeout: 30, IdleTimeout: 30, Config: rCfg}, + }, + ) + + rnr, cancelrnr := testRunner(t, ds) + defer cancelrnr() + + srv := testServer(ds, &mqs.Mock{}, ds, rnr, ServerTypeFull) + ok := `{"sleepTime": 1000, "isDebug": true}` + + results := make(chan error) + + type tester struct { + path string + body string + method string + expectedCode int + expectedErrSubStr string + } + + for idx, test := range []tester{ + {"/r/foo/json/", ok, "GET", http.StatusOK, ""}, + {"/r/foo/json/", ok, "GET", http.StatusOK, ""}, + {"/r/foo/json/", ok, "GET", http.StatusOK, ""}, + {"/r/foo/json/", ok, "GET", http.StatusOK, ""}, + } { + go func(i int, test tester) { + body := strings.NewReader(test.body) + _, rec := routerRequest(t, srv.Router, test.method, test.path, body) + respBytes, _ := ioutil.ReadAll(rec.Body) + respBody := string(respBytes) + maxBody := len(respBody) + if maxBody > 1024 { + maxBody = 1024 + } + + if rec.Code != test.expectedCode { + results <- fmt.Errorf("Test %d: Expected status code to be %d but was %d. body: %s", + i, test.expectedCode, rec.Code, respBody[:maxBody]) + } else if test.expectedErrSubStr != "" && !strings.Contains(respBody, test.expectedErrSubStr) { + results <- fmt.Errorf("Test %d: Expected response to include %s but got body: %s", + i, test.expectedErrSubStr, respBody[:maxBody]) + } else { + results <- nil + } + + }(idx, test) + } + + totalSuccess := 0 + totalFail := 0 + + // Scan for 4 test results + for i := 0; i < 4; i++ { + err := <-results + if err != nil { + t.Logf("Test %d: received: %s (this is probably OK)", i, err.Error()) + totalFail++ + } else { + totalSuccess++ + } + } + + if totalSuccess != 1 { + t.Errorf("Expected 1 success but got %d (fail: %d)", totalSuccess, totalFail) + isFailure = true + } +} + func TestRouteRunnerIOPipes(t *testing.T) { buf := setLogBuffer() isFailure := false