From 1258baeb7f2760727d504b0448f9b507b1bdbc07 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Thu, 19 Jul 2018 15:04:15 -0700 Subject: [PATCH] fn: agent eviction revisited (#1131) * fn: agent eviction revisited Previously, the hot-container eviction logic used number of waiters of cpu/mem resources to decide to evict a container. An ejection ticker used to wake up its associated container every 1 sec to reasses system load based on waiter count. However, this does not work for non-blocking agent since there are no waiters for non-blocking mode. Background on blocking versus non-blocking agent: *) Blocking agent holds a request until the the request is serviced or client times out. It assumes the request can be eventually serviced when idle containers eject themselves or busy containers finish their work. *) Non-blocking mode tries to limit this wait time. However non-blocking agent has never been truly non-blocking. This simply means that we only make a request wait if we take some action in the system. Non-blocking agents are configured with a much higher hot poll frequency to make the system more responsive as well as to handle cases where an too-busy event is missed by the request. This is because the communication between hot-launcher and waiting requests are not 1-1 and lossy if another request arrives for the same slot queue and receives a too-busy response before the original request. Introducing an evictor where each hot container can register itself, if it is idle for more than 1 seconds. Upon registry, these idle containers become eligible for eviction. In hot container launcher, in non-blocking mode, before we attempt to emit a too-busy response, now we attempt an evict. If this is successful, then we wait some more. This could result in requests waiting for more than they used to only if a container was evicted. For blocking-mode, the hot launcher uses hot-poll period to assess if a request has waited for too long, then eviction is triggered. --- api/agent/agent.go | 47 +++++++--- api/agent/evictor.go | 186 ++++++++++++++++++++++++++++++++++++++ api/agent/evictor_test.go | 143 +++++++++++++++++++++++++++++ api/agent/resource.go | 18 ---- api/agent/slots.go | 9 +- docs/operating/options.md | 2 +- 6 files changed, 365 insertions(+), 40 deletions(-) create mode 100644 api/agent/evictor.go create mode 100644 api/agent/evictor_test.go diff --git a/api/agent/agent.go b/api/agent/agent.go index 1e857ad35..f68674721 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -97,6 +97,7 @@ type agent struct { driver drivers.Driver slotMgr *slotQueueMgr + evictor Evictor // track usage resources ResourceTracker @@ -129,6 +130,7 @@ func New(da CallHandler, options ...AgentOption) Agent { a.shutWg = common.NewWaitGroup() a.da = da a.slotMgr = NewSlotQueueMgr() + a.evictor = NewEvictor() // Allow overriding config for _, option := range options { @@ -421,7 +423,12 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { if protocol.IsStreamable(protocol.Protocol(call.Format)) { // For hot requests, we use a long lived slot queue, which we use to manage hot containers var isNew bool - call.slots, isNew = a.slotMgr.getSlotQueue(call) + + if call.slotHashId == "" { + call.slotHashId = getSlotQueueKey(call) + } + + call.slots, isNew = a.slotMgr.getSlotQueue(call.slotHashId) call.requestState.UpdateState(ctx, RequestStateWait, call.slots) if isNew { go a.hotLauncher(ctx, call) @@ -501,7 +508,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err state := NewContainerState() state.UpdateState(ctx, ContainerStateWait, call.slots) - common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") + common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Debug("Hot function launcher attempting to start a container") mem := call.Memory + uint64(call.TmpFsSize) @@ -525,18 +532,27 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err select { case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): if tok != nil && tok.Error() != nil { - tryNotify(notifyChan, tok.Error()) + // before returning error response, as a last resort, try evicting idle containers. + if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) { + tryNotify(notifyChan, tok.Error()) + } } else if a.shutWg.AddSession(1) { go func() { // NOTE: runHot will not inherit the timeout from ctx (ignore timings) a.runHot(ctx, call, tok, state) a.shutWg.DoneSession() }() + // early return (do not allow container state to switch to ContainerStateDone) return } if tok != nil { tok.Close() } + // Request routines are polling us with this a.cfg.HotPoll frequency. We can use this + // same timer to assume that we waited for cpu/mem long enough. Let's try to evict an + // idle container. + case <-time.After(a.cfg.HotPoll): + a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) case <-ctx.Done(): // timeout case <-a.shutWg.Closer(): // server shutdown } @@ -913,14 +929,15 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, var err error isFrozen := false + isEvictable := false freezeTimer := time.NewTimer(a.cfg.FreezeIdle) idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second) - ejectTicker := time.NewTicker(a.cfg.EjectIdle) + ejectTimer := time.NewTimer(a.cfg.EjectIdle) defer freezeTimer.Stop() defer idleTimer.Stop() - defer ejectTicker.Stop() + defer ejectTimer.Stop() // log if any error is encountered defer func() { @@ -938,6 +955,8 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, isFrozen = true } + evictor := a.evictor.GetEvictor(call.ID, call.slotHashId, call.Memory+uint64(call.TmpFsSize), uint64(call.CPUs)) + state.UpdateState(ctx, ContainerStateIdle, call.slots) s := call.slots.queueSlot(slot) @@ -956,19 +975,21 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, isFrozen = true } continue - case <-ejectTicker.C: - // if someone is waiting for resource in our slot queue, we must not terminate, - // otherwise, see if other slot queues have resource waiters that are blocked. - stats := call.slots.getStats() - if stats.containerStates[ContainerStateWait] > 0 || - a.resources.GetResourceTokenWaiterCount() <= 0 { - continue - } + case <-evictor.C: logger.Debug("attempting hot function eject") + case <-ejectTimer.C: + // we've been idle too long, now we are ejectable + a.evictor.RegisterEvictor(evictor) + isEvictable = true + continue } break } + if isEvictable { + a.evictor.UnregisterEvictor(evictor) + } + // if we can acquire token, that means we are here due to // abort/shutdown/timeout, attempt to acquire and terminate, // otherwise continue processing the request diff --git a/api/agent/evictor.go b/api/agent/evictor.go new file mode 100644 index 000000000..2a4662e5b --- /dev/null +++ b/api/agent/evictor.go @@ -0,0 +1,186 @@ +package agent + +import ( + "sync" +) + +// Evictor For Agent +// Agent hot containers can register themselves as evictable using +// Register/Unregister calls. If a hot container registers itself, +// a starved request can call PerformEviction() to scan the eligible +// hot containers and if a number of these can be evicted to satisfy +// memory+cpu needs of the starved request, then those hot-containers +// are evicted (which is signalled using their channel.) + +type tokenKey struct { + id string + slotId string + memory uint64 + cpu uint64 +} + +type EvictToken struct { + key tokenKey + C chan struct{} +} + +type Evictor interface { + // Create an eviction token to be used in register/unregister functions + GetEvictor(id, slotId string, mem, cpu uint64) *EvictToken + + // register an eviction token with evictor system + RegisterEvictor(token *EvictToken) + + // unregister an eviction token from evictor system + UnregisterEvictor(token *EvictToken) + + // perform eviction to satisfy resource requirements of the call + // returns true if evictions were performed to satisfy the requirements. + PerformEviction(slotId string, mem, cpu uint64) bool +} + +type evictor struct { + lock sync.Mutex + id uint64 + tokens map[string]*EvictToken + slots []tokenKey +} + +func NewEvictor() Evictor { + return &evictor{ + tokens: make(map[string]*EvictToken), + slots: make([]tokenKey, 0), + } +} + +func (tok *EvictToken) isEvicted() bool { + select { + case <-tok.C: + return true + default: + } + return false +} + +func (tok *EvictToken) isEligible() bool { + // if no resource limits are in place, then this + // function is not eligible. + if tok.key.memory == 0 && tok.key.cpu == 0 { + return false + } + return true +} + +func (e *evictor) GetEvictor(id, slotId string, mem, cpu uint64) *EvictToken { + key := tokenKey{ + id: id, + slotId: slotId, + memory: mem, + cpu: cpu, + } + + return &EvictToken{ + key: key, + C: make(chan struct{}), + } +} + +func (e *evictor) RegisterEvictor(token *EvictToken) { + if !token.isEligible() || token.isEvicted() { + return + } + + e.lock.Lock() + + // be paranoid, do not register if it's already there + _, ok := e.tokens[token.key.id] + if !ok { + e.tokens[token.key.id] = token + e.slots = append(e.slots, token.key) + } + + e.lock.Unlock() +} + +func (e *evictor) UnregisterEvictor(token *EvictToken) { + if !token.isEligible() || token.isEvicted() { + return + } + + e.lock.Lock() + + for idx, val := range e.slots { + if val.id == token.key.id { + e.slots = append(e.slots[:idx], e.slots[idx+1:]...) + break + } + } + delete(e.tokens, token.key.id) + + e.lock.Unlock() +} + +func (e *evictor) PerformEviction(slotId string, mem, cpu uint64) bool { + // if no resources are defined for this function, then + // we don't know what to do here. We cannot evict anyone + // in this case. + if mem == 0 && cpu == 0 { + return false + } + + // Our eviction sum so far + totalMemory := uint64(0) + totalCpu := uint64(0) + isSatisfied := false + + var keys []string + var chans []chan struct{} + + e.lock.Lock() + + for _, val := range e.slots { + // lets not evict from our own slot queue + if slotId == val.slotId { + continue + } + + totalMemory += val.memory + totalCpu += val.cpu + keys = append(keys, val.id) + + // did we satisfy the need? + if totalMemory >= mem && totalCpu >= cpu { + isSatisfied = true + break + } + } + + // If we can satisfy the need, then let's commit/perform eviction + if isSatisfied { + + chans = make([]chan struct{}, 0, len(keys)) + idx := 0 + for _, id := range keys { + + // do not initialize idx, we continue where we left off + // since keys are in order from above. + for ; idx < len(e.slots); idx++ { + if id == e.slots[idx].id { + e.slots = append(e.slots[:idx], e.slots[idx+1:]...) + break + } + } + + chans = append(chans, e.tokens[id].C) + delete(e.tokens, id) + } + } + + e.lock.Unlock() + + for _, ch := range chans { + close(ch) + } + + return isSatisfied +} diff --git a/api/agent/evictor_test.go b/api/agent/evictor_test.go new file mode 100644 index 000000000..4f9102091 --- /dev/null +++ b/api/agent/evictor_test.go @@ -0,0 +1,143 @@ +package agent + +import ( + "testing" +) + +func getACall(id, slot string, mem, cpu int) (string, string, uint64, uint64) { + return id, slot, uint64(mem), uint64(cpu) +} + +func TestEvictorSimple01(t *testing.T) { + evictor := NewEvictor() + + slotId := "slot1" + id1, _, mem1, cpu1 := getACall("id1", slotId, 1, 100) + id2, _, mem2, cpu2 := getACall("id2", slotId, 1, 100) + + token1 := evictor.GetEvictor(id1, slotId, mem1, cpu1) + token2 := evictor.GetEvictor(id2, slotId, mem2, cpu2) + + evictor.RegisterEvictor(token1) + evictor.RegisterEvictor(token2) + + if evictor.PerformEviction(slotId, mem1, cpu1) { + t.Fatalf("We should not be able to self evict") + } + if evictor.PerformEviction("foo", 0, 0) { + t.Fatalf("We should not be able to evict: zero cpu/mem") + } + if evictor.PerformEviction("foo", 1, 300) { + t.Fatalf("We should not be able to evict (resource not enough)") + } + + if token1.isEvicted() { + t.Fatalf("should not be evicted") + } + if token2.isEvicted() { + t.Fatalf("should not be evicted") + } + + if !evictor.PerformEviction("foo", 1, 100) { + t.Fatalf("We should be able to evict") + } + + if !token1.isEvicted() { + t.Fatalf("should be evicted") + } + if token2.isEvicted() { + t.Fatalf("should not be evicted") + } + + evictor.UnregisterEvictor(token1) + evictor.UnregisterEvictor(token2) +} + +func TestEvictorSimple02(t *testing.T) { + evictor := NewEvictor() + + id1, slotId1, mem1, cpu1 := getACall("id1", "slot1", 1, 100) + id2, slotId2, mem2, cpu2 := getACall("id2", "slot1", 1, 100) + + token1 := evictor.GetEvictor(id1, slotId1, mem1, cpu1) + token2 := evictor.GetEvictor(id2, slotId2, mem2, cpu2) + + // add/rm/add + evictor.RegisterEvictor(token1) + evictor.UnregisterEvictor(token1) + evictor.RegisterEvictor(token1) + + // add/rm + evictor.RegisterEvictor(token2) + evictor.UnregisterEvictor(token2) + + if evictor.PerformEviction(slotId1, mem1, cpu1) { + t.Fatalf("We should not be able to self evict") + } + if evictor.PerformEviction("foo", 0, 0) { + t.Fatalf("We should not be able to evict: zero cpu/mem") + } + if token1.isEvicted() { + t.Fatalf("should not be evicted") + } + + evictor.UnregisterEvictor(token1) + + // not registered... but should be OK + evictor.UnregisterEvictor(token2) + + if evictor.PerformEviction("foo", mem1, cpu1) { + t.Fatalf("We should not be able to evict (unregistered)") + } + if token1.isEvicted() { + t.Fatalf("should not be evicted") + } + if token2.isEvicted() { + t.Fatalf("should not be evicted (not registered") + } +} + +func TestEvictorSimple03(t *testing.T) { + evictor := NewEvictor() + + taboo := "foo" + slotId := "slot1" + id0, slotId0, mem0, cpu0 := getACall("id0", taboo, 1, 100) + id1, _, mem1, cpu1 := getACall("id1", slotId, 1, 100) + id2, _, mem2, cpu2 := getACall("id2", slotId, 1, 100) + id3, _, mem3, cpu3 := getACall("id3", slotId, 1, 100) + + token0 := evictor.GetEvictor(id0, slotId0, mem0, cpu0) + token1 := evictor.GetEvictor(id1, slotId, mem1, cpu1) + token2 := evictor.GetEvictor(id2, slotId, mem2, cpu2) + token3 := evictor.GetEvictor(id3, slotId, mem3, cpu3) + + evictor.RegisterEvictor(token0) + evictor.RegisterEvictor(token1) + evictor.RegisterEvictor(token2) + evictor.RegisterEvictor(token3) + + if !evictor.PerformEviction(taboo, 1, 200) { + t.Fatalf("We should be able to evict") + } + + // same slot id should not be evicted... + if token0.isEvicted() { + t.Fatalf("should not be evicted") + } + if !token1.isEvicted() { + t.Fatalf("should be evicted") + } + if !token2.isEvicted() { + t.Fatalf("should be evicted") + } + // two tokens should be enough... + if token3.isEvicted() { + t.Fatalf("should not be evicted") + } + + evictor.UnregisterEvictor(token0) + evictor.UnregisterEvictor(token1) + evictor.UnregisterEvictor(token2) + evictor.UnregisterEvictor(token3) +} diff --git a/api/agent/resource.go b/api/agent/resource.go index d4ecfbf18..51c985c60 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -48,9 +48,6 @@ type ResourceTracker interface { // machine. It must be called before GetResourceToken or GetResourceToken may hang. // Memory is expected to be provided in MB units. IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool - - // returns number of waiters waiting for a resource token blocked on condition variable - GetResourceTokenWaiterCount() uint64 } type resourceTracker struct { @@ -77,8 +74,6 @@ type resourceTracker struct { cpuAsyncUsed uint64 // cpu in use for async area in which agent stops dequeuing async jobs cpuAsyncHWMark uint64 - // number of waiters waiting for a token blocked on the condition variable - tokenWaiterCount uint64 } func NewResourceTracker(cfg *AgentConfig) ResourceTracker { @@ -142,17 +137,6 @@ func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota uint64, isA } } -// returns number of waiters waiting for a resource token blocked on condition variable -func (a *resourceTracker) GetResourceTokenWaiterCount() uint64 { - var waiters uint64 - - a.cond.L.Lock() - waiters = a.tokenWaiterCount - a.cond.L.Unlock() - - return waiters -} - func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync bool) ResourceToken { var asyncMem, syncMem uint64 @@ -271,9 +255,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c isWaiting = true for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { - a.tokenWaiterCount++ c.Wait() - a.tokenWaiterCount-- } isWaiting = false diff --git a/api/agent/slots.go b/api/agent/slots.go index 3e8508978..7a9f67e31 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -243,14 +243,7 @@ func (a *slotQueue) exitContainerState(conType ContainerStateType) { // getSlot must ensure that if it receives a slot, it will be returned, otherwise // a container will be locked up forever waiting for slot to free. -func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) { - - var key string - if call.slotHashId != "" { - key = call.slotHashId - } else { - key = getSlotQueueKey(call) - } +func (a *slotQueueMgr) getSlotQueue(key string) (*slotQueue, bool) { a.hMu.Lock() slots, ok := a.hot[key] diff --git a/docs/operating/options.md b/docs/operating/options.md index 9a049e07b..a66d6cb6d 100644 --- a/docs/operating/options.md +++ b/docs/operating/options.md @@ -31,7 +31,7 @@ docker run -e VAR_NAME=VALUE ... | `FN_API_CORS_ORIGINS` | A comma separated list of URLs to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for (or `*` for all domains). This corresponds to the allowed origins in the `Acccess-Control-Allow-Origin` header. | None | | `FN_API_CORS_HEADERS` | A comma separated list of Headers to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for. This corresponds to the allowed headers in the `Access-Control-Allow-Headers` header. | Origin,Content-Length,Content-Type | | `FN_FREEZE_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before pausing/freezing an idle hot container. Set to 0 to freeze idle containers without any delay. Set to negative integer to disable freeze/pause of idle hot containers. | 50 | -| `FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time in milliseconds to periodically check to terminate an idle hot container if the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 | +| `FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time in milliseconds for an idle hot container to become eligible for eviction if the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 | | `FN_MAX_RESPONSE_SIZE` | Set this option to specify the http body or json response size in bytes from the containers. | 0 (off) | | `DOCKER_HOST` | Docker remote API URL. | /var/run/docker.sock | | `DOCKER_API_VERSION` | Docker remote API version. | 1.24 |