diff --git a/api/agent/agent.go b/api/agent/agent.go index 1e857ad35..f68674721 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -97,6 +97,7 @@ type agent struct { driver drivers.Driver slotMgr *slotQueueMgr + evictor Evictor // track usage resources ResourceTracker @@ -129,6 +130,7 @@ func New(da CallHandler, options ...AgentOption) Agent { a.shutWg = common.NewWaitGroup() a.da = da a.slotMgr = NewSlotQueueMgr() + a.evictor = NewEvictor() // Allow overriding config for _, option := range options { @@ -421,7 +423,12 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) { if protocol.IsStreamable(protocol.Protocol(call.Format)) { // For hot requests, we use a long lived slot queue, which we use to manage hot containers var isNew bool - call.slots, isNew = a.slotMgr.getSlotQueue(call) + + if call.slotHashId == "" { + call.slotHashId = getSlotQueueKey(call) + } + + call.slots, isNew = a.slotMgr.getSlotQueue(call.slotHashId) call.requestState.UpdateState(ctx, RequestStateWait, call.slots) if isNew { go a.hotLauncher(ctx, call) @@ -501,7 +508,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err state := NewContainerState() state.UpdateState(ctx, ContainerStateWait, call.slots) - common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") + common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Debug("Hot function launcher attempting to start a container") mem := call.Memory + uint64(call.TmpFsSize) @@ -525,18 +532,27 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err select { case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): if tok != nil && tok.Error() != nil { - tryNotify(notifyChan, tok.Error()) + // before returning error response, as a last resort, try evicting idle containers. + if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) { + tryNotify(notifyChan, tok.Error()) + } } else if a.shutWg.AddSession(1) { go func() { // NOTE: runHot will not inherit the timeout from ctx (ignore timings) a.runHot(ctx, call, tok, state) a.shutWg.DoneSession() }() + // early return (do not allow container state to switch to ContainerStateDone) return } if tok != nil { tok.Close() } + // Request routines are polling us with this a.cfg.HotPoll frequency. We can use this + // same timer to assume that we waited for cpu/mem long enough. Let's try to evict an + // idle container. + case <-time.After(a.cfg.HotPoll): + a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) case <-ctx.Done(): // timeout case <-a.shutWg.Closer(): // server shutdown } @@ -913,14 +929,15 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, var err error isFrozen := false + isEvictable := false freezeTimer := time.NewTimer(a.cfg.FreezeIdle) idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second) - ejectTicker := time.NewTicker(a.cfg.EjectIdle) + ejectTimer := time.NewTimer(a.cfg.EjectIdle) defer freezeTimer.Stop() defer idleTimer.Stop() - defer ejectTicker.Stop() + defer ejectTimer.Stop() // log if any error is encountered defer func() { @@ -938,6 +955,8 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, isFrozen = true } + evictor := a.evictor.GetEvictor(call.ID, call.slotHashId, call.Memory+uint64(call.TmpFsSize), uint64(call.CPUs)) + state.UpdateState(ctx, ContainerStateIdle, call.slots) s := call.slots.queueSlot(slot) @@ -956,19 +975,21 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, isFrozen = true } continue - case <-ejectTicker.C: - // if someone is waiting for resource in our slot queue, we must not terminate, - // otherwise, see if other slot queues have resource waiters that are blocked. - stats := call.slots.getStats() - if stats.containerStates[ContainerStateWait] > 0 || - a.resources.GetResourceTokenWaiterCount() <= 0 { - continue - } + case <-evictor.C: logger.Debug("attempting hot function eject") + case <-ejectTimer.C: + // we've been idle too long, now we are ejectable + a.evictor.RegisterEvictor(evictor) + isEvictable = true + continue } break } + if isEvictable { + a.evictor.UnregisterEvictor(evictor) + } + // if we can acquire token, that means we are here due to // abort/shutdown/timeout, attempt to acquire and terminate, // otherwise continue processing the request diff --git a/api/agent/evictor.go b/api/agent/evictor.go new file mode 100644 index 000000000..2a4662e5b --- /dev/null +++ b/api/agent/evictor.go @@ -0,0 +1,186 @@ +package agent + +import ( + "sync" +) + +// Evictor For Agent +// Agent hot containers can register themselves as evictable using +// Register/Unregister calls. If a hot container registers itself, +// a starved request can call PerformEviction() to scan the eligible +// hot containers and if a number of these can be evicted to satisfy +// memory+cpu needs of the starved request, then those hot-containers +// are evicted (which is signalled using their channel.) + +type tokenKey struct { + id string + slotId string + memory uint64 + cpu uint64 +} + +type EvictToken struct { + key tokenKey + C chan struct{} +} + +type Evictor interface { + // Create an eviction token to be used in register/unregister functions + GetEvictor(id, slotId string, mem, cpu uint64) *EvictToken + + // register an eviction token with evictor system + RegisterEvictor(token *EvictToken) + + // unregister an eviction token from evictor system + UnregisterEvictor(token *EvictToken) + + // perform eviction to satisfy resource requirements of the call + // returns true if evictions were performed to satisfy the requirements. + PerformEviction(slotId string, mem, cpu uint64) bool +} + +type evictor struct { + lock sync.Mutex + id uint64 + tokens map[string]*EvictToken + slots []tokenKey +} + +func NewEvictor() Evictor { + return &evictor{ + tokens: make(map[string]*EvictToken), + slots: make([]tokenKey, 0), + } +} + +func (tok *EvictToken) isEvicted() bool { + select { + case <-tok.C: + return true + default: + } + return false +} + +func (tok *EvictToken) isEligible() bool { + // if no resource limits are in place, then this + // function is not eligible. + if tok.key.memory == 0 && tok.key.cpu == 0 { + return false + } + return true +} + +func (e *evictor) GetEvictor(id, slotId string, mem, cpu uint64) *EvictToken { + key := tokenKey{ + id: id, + slotId: slotId, + memory: mem, + cpu: cpu, + } + + return &EvictToken{ + key: key, + C: make(chan struct{}), + } +} + +func (e *evictor) RegisterEvictor(token *EvictToken) { + if !token.isEligible() || token.isEvicted() { + return + } + + e.lock.Lock() + + // be paranoid, do not register if it's already there + _, ok := e.tokens[token.key.id] + if !ok { + e.tokens[token.key.id] = token + e.slots = append(e.slots, token.key) + } + + e.lock.Unlock() +} + +func (e *evictor) UnregisterEvictor(token *EvictToken) { + if !token.isEligible() || token.isEvicted() { + return + } + + e.lock.Lock() + + for idx, val := range e.slots { + if val.id == token.key.id { + e.slots = append(e.slots[:idx], e.slots[idx+1:]...) + break + } + } + delete(e.tokens, token.key.id) + + e.lock.Unlock() +} + +func (e *evictor) PerformEviction(slotId string, mem, cpu uint64) bool { + // if no resources are defined for this function, then + // we don't know what to do here. We cannot evict anyone + // in this case. + if mem == 0 && cpu == 0 { + return false + } + + // Our eviction sum so far + totalMemory := uint64(0) + totalCpu := uint64(0) + isSatisfied := false + + var keys []string + var chans []chan struct{} + + e.lock.Lock() + + for _, val := range e.slots { + // lets not evict from our own slot queue + if slotId == val.slotId { + continue + } + + totalMemory += val.memory + totalCpu += val.cpu + keys = append(keys, val.id) + + // did we satisfy the need? + if totalMemory >= mem && totalCpu >= cpu { + isSatisfied = true + break + } + } + + // If we can satisfy the need, then let's commit/perform eviction + if isSatisfied { + + chans = make([]chan struct{}, 0, len(keys)) + idx := 0 + for _, id := range keys { + + // do not initialize idx, we continue where we left off + // since keys are in order from above. + for ; idx < len(e.slots); idx++ { + if id == e.slots[idx].id { + e.slots = append(e.slots[:idx], e.slots[idx+1:]...) + break + } + } + + chans = append(chans, e.tokens[id].C) + delete(e.tokens, id) + } + } + + e.lock.Unlock() + + for _, ch := range chans { + close(ch) + } + + return isSatisfied +} diff --git a/api/agent/evictor_test.go b/api/agent/evictor_test.go new file mode 100644 index 000000000..4f9102091 --- /dev/null +++ b/api/agent/evictor_test.go @@ -0,0 +1,143 @@ +package agent + +import ( + "testing" +) + +func getACall(id, slot string, mem, cpu int) (string, string, uint64, uint64) { + return id, slot, uint64(mem), uint64(cpu) +} + +func TestEvictorSimple01(t *testing.T) { + evictor := NewEvictor() + + slotId := "slot1" + id1, _, mem1, cpu1 := getACall("id1", slotId, 1, 100) + id2, _, mem2, cpu2 := getACall("id2", slotId, 1, 100) + + token1 := evictor.GetEvictor(id1, slotId, mem1, cpu1) + token2 := evictor.GetEvictor(id2, slotId, mem2, cpu2) + + evictor.RegisterEvictor(token1) + evictor.RegisterEvictor(token2) + + if evictor.PerformEviction(slotId, mem1, cpu1) { + t.Fatalf("We should not be able to self evict") + } + if evictor.PerformEviction("foo", 0, 0) { + t.Fatalf("We should not be able to evict: zero cpu/mem") + } + if evictor.PerformEviction("foo", 1, 300) { + t.Fatalf("We should not be able to evict (resource not enough)") + } + + if token1.isEvicted() { + t.Fatalf("should not be evicted") + } + if token2.isEvicted() { + t.Fatalf("should not be evicted") + } + + if !evictor.PerformEviction("foo", 1, 100) { + t.Fatalf("We should be able to evict") + } + + if !token1.isEvicted() { + t.Fatalf("should be evicted") + } + if token2.isEvicted() { + t.Fatalf("should not be evicted") + } + + evictor.UnregisterEvictor(token1) + evictor.UnregisterEvictor(token2) +} + +func TestEvictorSimple02(t *testing.T) { + evictor := NewEvictor() + + id1, slotId1, mem1, cpu1 := getACall("id1", "slot1", 1, 100) + id2, slotId2, mem2, cpu2 := getACall("id2", "slot1", 1, 100) + + token1 := evictor.GetEvictor(id1, slotId1, mem1, cpu1) + token2 := evictor.GetEvictor(id2, slotId2, mem2, cpu2) + + // add/rm/add + evictor.RegisterEvictor(token1) + evictor.UnregisterEvictor(token1) + evictor.RegisterEvictor(token1) + + // add/rm + evictor.RegisterEvictor(token2) + evictor.UnregisterEvictor(token2) + + if evictor.PerformEviction(slotId1, mem1, cpu1) { + t.Fatalf("We should not be able to self evict") + } + if evictor.PerformEviction("foo", 0, 0) { + t.Fatalf("We should not be able to evict: zero cpu/mem") + } + if token1.isEvicted() { + t.Fatalf("should not be evicted") + } + + evictor.UnregisterEvictor(token1) + + // not registered... but should be OK + evictor.UnregisterEvictor(token2) + + if evictor.PerformEviction("foo", mem1, cpu1) { + t.Fatalf("We should not be able to evict (unregistered)") + } + if token1.isEvicted() { + t.Fatalf("should not be evicted") + } + if token2.isEvicted() { + t.Fatalf("should not be evicted (not registered") + } +} + +func TestEvictorSimple03(t *testing.T) { + evictor := NewEvictor() + + taboo := "foo" + slotId := "slot1" + id0, slotId0, mem0, cpu0 := getACall("id0", taboo, 1, 100) + id1, _, mem1, cpu1 := getACall("id1", slotId, 1, 100) + id2, _, mem2, cpu2 := getACall("id2", slotId, 1, 100) + id3, _, mem3, cpu3 := getACall("id3", slotId, 1, 100) + + token0 := evictor.GetEvictor(id0, slotId0, mem0, cpu0) + token1 := evictor.GetEvictor(id1, slotId, mem1, cpu1) + token2 := evictor.GetEvictor(id2, slotId, mem2, cpu2) + token3 := evictor.GetEvictor(id3, slotId, mem3, cpu3) + + evictor.RegisterEvictor(token0) + evictor.RegisterEvictor(token1) + evictor.RegisterEvictor(token2) + evictor.RegisterEvictor(token3) + + if !evictor.PerformEviction(taboo, 1, 200) { + t.Fatalf("We should be able to evict") + } + + // same slot id should not be evicted... + if token0.isEvicted() { + t.Fatalf("should not be evicted") + } + if !token1.isEvicted() { + t.Fatalf("should be evicted") + } + if !token2.isEvicted() { + t.Fatalf("should be evicted") + } + // two tokens should be enough... + if token3.isEvicted() { + t.Fatalf("should not be evicted") + } + + evictor.UnregisterEvictor(token0) + evictor.UnregisterEvictor(token1) + evictor.UnregisterEvictor(token2) + evictor.UnregisterEvictor(token3) +} diff --git a/api/agent/resource.go b/api/agent/resource.go index d4ecfbf18..51c985c60 100644 --- a/api/agent/resource.go +++ b/api/agent/resource.go @@ -48,9 +48,6 @@ type ResourceTracker interface { // machine. It must be called before GetResourceToken or GetResourceToken may hang. // Memory is expected to be provided in MB units. IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool - - // returns number of waiters waiting for a resource token blocked on condition variable - GetResourceTokenWaiterCount() uint64 } type resourceTracker struct { @@ -77,8 +74,6 @@ type resourceTracker struct { cpuAsyncUsed uint64 // cpu in use for async area in which agent stops dequeuing async jobs cpuAsyncHWMark uint64 - // number of waiters waiting for a token blocked on the condition variable - tokenWaiterCount uint64 } func NewResourceTracker(cfg *AgentConfig) ResourceTracker { @@ -142,17 +137,6 @@ func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota uint64, isA } } -// returns number of waiters waiting for a resource token blocked on condition variable -func (a *resourceTracker) GetResourceTokenWaiterCount() uint64 { - var waiters uint64 - - a.cond.L.Lock() - waiters = a.tokenWaiterCount - a.cond.L.Unlock() - - return waiters -} - func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync bool) ResourceToken { var asyncMem, syncMem uint64 @@ -271,9 +255,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c isWaiting = true for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { - a.tokenWaiterCount++ c.Wait() - a.tokenWaiterCount-- } isWaiting = false diff --git a/api/agent/slots.go b/api/agent/slots.go index 3e8508978..7a9f67e31 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -243,14 +243,7 @@ func (a *slotQueue) exitContainerState(conType ContainerStateType) { // getSlot must ensure that if it receives a slot, it will be returned, otherwise // a container will be locked up forever waiting for slot to free. -func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) { - - var key string - if call.slotHashId != "" { - key = call.slotHashId - } else { - key = getSlotQueueKey(call) - } +func (a *slotQueueMgr) getSlotQueue(key string) (*slotQueue, bool) { a.hMu.Lock() slots, ok := a.hot[key] diff --git a/docs/operating/options.md b/docs/operating/options.md index 9a049e07b..a66d6cb6d 100644 --- a/docs/operating/options.md +++ b/docs/operating/options.md @@ -31,7 +31,7 @@ docker run -e VAR_NAME=VALUE ... | `FN_API_CORS_ORIGINS` | A comma separated list of URLs to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for (or `*` for all domains). This corresponds to the allowed origins in the `Acccess-Control-Allow-Origin` header. | None | | `FN_API_CORS_HEADERS` | A comma separated list of Headers to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for. This corresponds to the allowed headers in the `Access-Control-Allow-Headers` header. | Origin,Content-Length,Content-Type | | `FN_FREEZE_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before pausing/freezing an idle hot container. Set to 0 to freeze idle containers without any delay. Set to negative integer to disable freeze/pause of idle hot containers. | 50 | -| `FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time in milliseconds to periodically check to terminate an idle hot container if the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 | +| `FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time in milliseconds for an idle hot container to become eligible for eviction if the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 | | `FN_MAX_RESPONSE_SIZE` | Set this option to specify the http body or json response size in bytes from the containers. | 0 (off) | | `DOCKER_HOST` | Docker remote API URL. | /var/run/docker.sock | | `DOCKER_API_VERSION` | Docker remote API version. | 1.24 |