fn: agent eviction revisited (#1131)

* fn: agent eviction revisited

Previously, the hot-container eviction logic used
number of waiters of cpu/mem resources to decide to
evict a container. An ejection ticker used to wake up
its associated container every 1 sec to reasses system
load based on waiter count. However, this does not work
for non-blocking agent since there are no waiters for
non-blocking mode.

Background on blocking versus non-blocking agent:
    *) Blocking agent holds a request until the
    the request is serviced or client times out. It assumes
    the request can be eventually serviced when idle
    containers eject themselves or busy containers finish
    their work.
    *) Non-blocking mode tries to limit this wait time.
    However non-blocking agent has never been truly
    non-blocking. This simply means that we only
    make a request wait if we take some action in
    the system. Non-blocking agents are configured with
    a much higher hot poll frequency to make the system
    more responsive as well as to handle cases where an
    too-busy event is missed by the request. This is because
    the communication between hot-launcher and waiting
    requests are not 1-1 and lossy if another request
    arrives for the same slot queue and receives a
    too-busy response before the original request.

Introducing an evictor where each hot container can
register itself, if it is idle for more than 1 seconds.
Upon registry, these idle containers become eligible
for eviction.

In hot container launcher, in non-blocking mode,
before we attempt to emit a too-busy response, now
we attempt an evict. If this is successful, then
we wait some more. This could result in requests
waiting for more than they used to only if a
container was evicted. For blocking-mode, the
hot launcher uses hot-poll period to assess if
a request has waited for too long, then eviction
is triggered.
This commit is contained in:
Tolga Ceylan
2018-07-19 15:04:15 -07:00
committed by GitHub
parent 8e373005a0
commit 1258baeb7f
6 changed files with 365 additions and 40 deletions

View File

@@ -97,6 +97,7 @@ type agent struct {
driver drivers.Driver driver drivers.Driver
slotMgr *slotQueueMgr slotMgr *slotQueueMgr
evictor Evictor
// track usage // track usage
resources ResourceTracker resources ResourceTracker
@@ -129,6 +130,7 @@ func New(da CallHandler, options ...AgentOption) Agent {
a.shutWg = common.NewWaitGroup() a.shutWg = common.NewWaitGroup()
a.da = da a.da = da
a.slotMgr = NewSlotQueueMgr() a.slotMgr = NewSlotQueueMgr()
a.evictor = NewEvictor()
// Allow overriding config // Allow overriding config
for _, option := range options { for _, option := range options {
@@ -421,7 +423,12 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
if protocol.IsStreamable(protocol.Protocol(call.Format)) { if protocol.IsStreamable(protocol.Protocol(call.Format)) {
// For hot requests, we use a long lived slot queue, which we use to manage hot containers // For hot requests, we use a long lived slot queue, which we use to manage hot containers
var isNew bool var isNew bool
call.slots, isNew = a.slotMgr.getSlotQueue(call)
if call.slotHashId == "" {
call.slotHashId = getSlotQueueKey(call)
}
call.slots, isNew = a.slotMgr.getSlotQueue(call.slotHashId)
call.requestState.UpdateState(ctx, RequestStateWait, call.slots) call.requestState.UpdateState(ctx, RequestStateWait, call.slots)
if isNew { if isNew {
go a.hotLauncher(ctx, call) go a.hotLauncher(ctx, call)
@@ -501,7 +508,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
state := NewContainerState() state := NewContainerState()
state.UpdateState(ctx, ContainerStateWait, call.slots) state.UpdateState(ctx, ContainerStateWait, call.slots)
common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Info("Hot function launcher starting hot container") common.Logger(ctx).WithFields(logrus.Fields{"currentStats": call.slots.getStats(), "isNeeded": isNeeded}).Debug("Hot function launcher attempting to start a container")
mem := call.Memory + uint64(call.TmpFsSize) mem := call.Memory + uint64(call.TmpFsSize)
@@ -525,18 +532,27 @@ func (a *agent) checkLaunch(ctx context.Context, call *call, notifyChan chan err
select { select {
case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB): case tok := <-a.resources.GetResourceToken(ctx, mem, uint64(call.CPUs), isAsync, isNB):
if tok != nil && tok.Error() != nil { if tok != nil && tok.Error() != nil {
tryNotify(notifyChan, tok.Error()) // before returning error response, as a last resort, try evicting idle containers.
if tok.Error() != CapacityFull || !a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs)) {
tryNotify(notifyChan, tok.Error())
}
} else if a.shutWg.AddSession(1) { } else if a.shutWg.AddSession(1) {
go func() { go func() {
// NOTE: runHot will not inherit the timeout from ctx (ignore timings) // NOTE: runHot will not inherit the timeout from ctx (ignore timings)
a.runHot(ctx, call, tok, state) a.runHot(ctx, call, tok, state)
a.shutWg.DoneSession() a.shutWg.DoneSession()
}() }()
// early return (do not allow container state to switch to ContainerStateDone)
return return
} }
if tok != nil { if tok != nil {
tok.Close() tok.Close()
} }
// Request routines are polling us with this a.cfg.HotPoll frequency. We can use this
// same timer to assume that we waited for cpu/mem long enough. Let's try to evict an
// idle container.
case <-time.After(a.cfg.HotPoll):
a.evictor.PerformEviction(call.slotHashId, mem, uint64(call.CPUs))
case <-ctx.Done(): // timeout case <-ctx.Done(): // timeout
case <-a.shutWg.Closer(): // server shutdown case <-a.shutWg.Closer(): // server shutdown
} }
@@ -913,14 +929,15 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
var err error var err error
isFrozen := false isFrozen := false
isEvictable := false
freezeTimer := time.NewTimer(a.cfg.FreezeIdle) freezeTimer := time.NewTimer(a.cfg.FreezeIdle)
idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second) idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second)
ejectTicker := time.NewTicker(a.cfg.EjectIdle) ejectTimer := time.NewTimer(a.cfg.EjectIdle)
defer freezeTimer.Stop() defer freezeTimer.Stop()
defer idleTimer.Stop() defer idleTimer.Stop()
defer ejectTicker.Stop() defer ejectTimer.Stop()
// log if any error is encountered // log if any error is encountered
defer func() { defer func() {
@@ -938,6 +955,8 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
isFrozen = true isFrozen = true
} }
evictor := a.evictor.GetEvictor(call.ID, call.slotHashId, call.Memory+uint64(call.TmpFsSize), uint64(call.CPUs))
state.UpdateState(ctx, ContainerStateIdle, call.slots) state.UpdateState(ctx, ContainerStateIdle, call.slots)
s := call.slots.queueSlot(slot) s := call.slots.queueSlot(slot)
@@ -956,19 +975,21 @@ func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState,
isFrozen = true isFrozen = true
} }
continue continue
case <-ejectTicker.C: case <-evictor.C:
// if someone is waiting for resource in our slot queue, we must not terminate,
// otherwise, see if other slot queues have resource waiters that are blocked.
stats := call.slots.getStats()
if stats.containerStates[ContainerStateWait] > 0 ||
a.resources.GetResourceTokenWaiterCount() <= 0 {
continue
}
logger.Debug("attempting hot function eject") logger.Debug("attempting hot function eject")
case <-ejectTimer.C:
// we've been idle too long, now we are ejectable
a.evictor.RegisterEvictor(evictor)
isEvictable = true
continue
} }
break break
} }
if isEvictable {
a.evictor.UnregisterEvictor(evictor)
}
// if we can acquire token, that means we are here due to // if we can acquire token, that means we are here due to
// abort/shutdown/timeout, attempt to acquire and terminate, // abort/shutdown/timeout, attempt to acquire and terminate,
// otherwise continue processing the request // otherwise continue processing the request

186
api/agent/evictor.go Normal file
View File

@@ -0,0 +1,186 @@
package agent
import (
"sync"
)
// Evictor For Agent
// Agent hot containers can register themselves as evictable using
// Register/Unregister calls. If a hot container registers itself,
// a starved request can call PerformEviction() to scan the eligible
// hot containers and if a number of these can be evicted to satisfy
// memory+cpu needs of the starved request, then those hot-containers
// are evicted (which is signalled using their channel.)
type tokenKey struct {
id string
slotId string
memory uint64
cpu uint64
}
type EvictToken struct {
key tokenKey
C chan struct{}
}
type Evictor interface {
// Create an eviction token to be used in register/unregister functions
GetEvictor(id, slotId string, mem, cpu uint64) *EvictToken
// register an eviction token with evictor system
RegisterEvictor(token *EvictToken)
// unregister an eviction token from evictor system
UnregisterEvictor(token *EvictToken)
// perform eviction to satisfy resource requirements of the call
// returns true if evictions were performed to satisfy the requirements.
PerformEviction(slotId string, mem, cpu uint64) bool
}
type evictor struct {
lock sync.Mutex
id uint64
tokens map[string]*EvictToken
slots []tokenKey
}
func NewEvictor() Evictor {
return &evictor{
tokens: make(map[string]*EvictToken),
slots: make([]tokenKey, 0),
}
}
func (tok *EvictToken) isEvicted() bool {
select {
case <-tok.C:
return true
default:
}
return false
}
func (tok *EvictToken) isEligible() bool {
// if no resource limits are in place, then this
// function is not eligible.
if tok.key.memory == 0 && tok.key.cpu == 0 {
return false
}
return true
}
func (e *evictor) GetEvictor(id, slotId string, mem, cpu uint64) *EvictToken {
key := tokenKey{
id: id,
slotId: slotId,
memory: mem,
cpu: cpu,
}
return &EvictToken{
key: key,
C: make(chan struct{}),
}
}
func (e *evictor) RegisterEvictor(token *EvictToken) {
if !token.isEligible() || token.isEvicted() {
return
}
e.lock.Lock()
// be paranoid, do not register if it's already there
_, ok := e.tokens[token.key.id]
if !ok {
e.tokens[token.key.id] = token
e.slots = append(e.slots, token.key)
}
e.lock.Unlock()
}
func (e *evictor) UnregisterEvictor(token *EvictToken) {
if !token.isEligible() || token.isEvicted() {
return
}
e.lock.Lock()
for idx, val := range e.slots {
if val.id == token.key.id {
e.slots = append(e.slots[:idx], e.slots[idx+1:]...)
break
}
}
delete(e.tokens, token.key.id)
e.lock.Unlock()
}
func (e *evictor) PerformEviction(slotId string, mem, cpu uint64) bool {
// if no resources are defined for this function, then
// we don't know what to do here. We cannot evict anyone
// in this case.
if mem == 0 && cpu == 0 {
return false
}
// Our eviction sum so far
totalMemory := uint64(0)
totalCpu := uint64(0)
isSatisfied := false
var keys []string
var chans []chan struct{}
e.lock.Lock()
for _, val := range e.slots {
// lets not evict from our own slot queue
if slotId == val.slotId {
continue
}
totalMemory += val.memory
totalCpu += val.cpu
keys = append(keys, val.id)
// did we satisfy the need?
if totalMemory >= mem && totalCpu >= cpu {
isSatisfied = true
break
}
}
// If we can satisfy the need, then let's commit/perform eviction
if isSatisfied {
chans = make([]chan struct{}, 0, len(keys))
idx := 0
for _, id := range keys {
// do not initialize idx, we continue where we left off
// since keys are in order from above.
for ; idx < len(e.slots); idx++ {
if id == e.slots[idx].id {
e.slots = append(e.slots[:idx], e.slots[idx+1:]...)
break
}
}
chans = append(chans, e.tokens[id].C)
delete(e.tokens, id)
}
}
e.lock.Unlock()
for _, ch := range chans {
close(ch)
}
return isSatisfied
}

143
api/agent/evictor_test.go Normal file
View File

@@ -0,0 +1,143 @@
package agent
import (
"testing"
)
func getACall(id, slot string, mem, cpu int) (string, string, uint64, uint64) {
return id, slot, uint64(mem), uint64(cpu)
}
func TestEvictorSimple01(t *testing.T) {
evictor := NewEvictor()
slotId := "slot1"
id1, _, mem1, cpu1 := getACall("id1", slotId, 1, 100)
id2, _, mem2, cpu2 := getACall("id2", slotId, 1, 100)
token1 := evictor.GetEvictor(id1, slotId, mem1, cpu1)
token2 := evictor.GetEvictor(id2, slotId, mem2, cpu2)
evictor.RegisterEvictor(token1)
evictor.RegisterEvictor(token2)
if evictor.PerformEviction(slotId, mem1, cpu1) {
t.Fatalf("We should not be able to self evict")
}
if evictor.PerformEviction("foo", 0, 0) {
t.Fatalf("We should not be able to evict: zero cpu/mem")
}
if evictor.PerformEviction("foo", 1, 300) {
t.Fatalf("We should not be able to evict (resource not enough)")
}
if token1.isEvicted() {
t.Fatalf("should not be evicted")
}
if token2.isEvicted() {
t.Fatalf("should not be evicted")
}
if !evictor.PerformEviction("foo", 1, 100) {
t.Fatalf("We should be able to evict")
}
if !token1.isEvicted() {
t.Fatalf("should be evicted")
}
if token2.isEvicted() {
t.Fatalf("should not be evicted")
}
evictor.UnregisterEvictor(token1)
evictor.UnregisterEvictor(token2)
}
func TestEvictorSimple02(t *testing.T) {
evictor := NewEvictor()
id1, slotId1, mem1, cpu1 := getACall("id1", "slot1", 1, 100)
id2, slotId2, mem2, cpu2 := getACall("id2", "slot1", 1, 100)
token1 := evictor.GetEvictor(id1, slotId1, mem1, cpu1)
token2 := evictor.GetEvictor(id2, slotId2, mem2, cpu2)
// add/rm/add
evictor.RegisterEvictor(token1)
evictor.UnregisterEvictor(token1)
evictor.RegisterEvictor(token1)
// add/rm
evictor.RegisterEvictor(token2)
evictor.UnregisterEvictor(token2)
if evictor.PerformEviction(slotId1, mem1, cpu1) {
t.Fatalf("We should not be able to self evict")
}
if evictor.PerformEviction("foo", 0, 0) {
t.Fatalf("We should not be able to evict: zero cpu/mem")
}
if token1.isEvicted() {
t.Fatalf("should not be evicted")
}
evictor.UnregisterEvictor(token1)
// not registered... but should be OK
evictor.UnregisterEvictor(token2)
if evictor.PerformEviction("foo", mem1, cpu1) {
t.Fatalf("We should not be able to evict (unregistered)")
}
if token1.isEvicted() {
t.Fatalf("should not be evicted")
}
if token2.isEvicted() {
t.Fatalf("should not be evicted (not registered")
}
}
func TestEvictorSimple03(t *testing.T) {
evictor := NewEvictor()
taboo := "foo"
slotId := "slot1"
id0, slotId0, mem0, cpu0 := getACall("id0", taboo, 1, 100)
id1, _, mem1, cpu1 := getACall("id1", slotId, 1, 100)
id2, _, mem2, cpu2 := getACall("id2", slotId, 1, 100)
id3, _, mem3, cpu3 := getACall("id3", slotId, 1, 100)
token0 := evictor.GetEvictor(id0, slotId0, mem0, cpu0)
token1 := evictor.GetEvictor(id1, slotId, mem1, cpu1)
token2 := evictor.GetEvictor(id2, slotId, mem2, cpu2)
token3 := evictor.GetEvictor(id3, slotId, mem3, cpu3)
evictor.RegisterEvictor(token0)
evictor.RegisterEvictor(token1)
evictor.RegisterEvictor(token2)
evictor.RegisterEvictor(token3)
if !evictor.PerformEviction(taboo, 1, 200) {
t.Fatalf("We should be able to evict")
}
// same slot id should not be evicted...
if token0.isEvicted() {
t.Fatalf("should not be evicted")
}
if !token1.isEvicted() {
t.Fatalf("should be evicted")
}
if !token2.isEvicted() {
t.Fatalf("should be evicted")
}
// two tokens should be enough...
if token3.isEvicted() {
t.Fatalf("should not be evicted")
}
evictor.UnregisterEvictor(token0)
evictor.UnregisterEvictor(token1)
evictor.UnregisterEvictor(token2)
evictor.UnregisterEvictor(token3)
}

View File

@@ -48,9 +48,6 @@ type ResourceTracker interface {
// machine. It must be called before GetResourceToken or GetResourceToken may hang. // machine. It must be called before GetResourceToken or GetResourceToken may hang.
// Memory is expected to be provided in MB units. // Memory is expected to be provided in MB units.
IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool
// returns number of waiters waiting for a resource token blocked on condition variable
GetResourceTokenWaiterCount() uint64
} }
type resourceTracker struct { type resourceTracker struct {
@@ -77,8 +74,6 @@ type resourceTracker struct {
cpuAsyncUsed uint64 cpuAsyncUsed uint64
// cpu in use for async area in which agent stops dequeuing async jobs // cpu in use for async area in which agent stops dequeuing async jobs
cpuAsyncHWMark uint64 cpuAsyncHWMark uint64
// number of waiters waiting for a token blocked on the condition variable
tokenWaiterCount uint64
} }
func NewResourceTracker(cfg *AgentConfig) ResourceTracker { func NewResourceTracker(cfg *AgentConfig) ResourceTracker {
@@ -142,17 +137,6 @@ func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota uint64, isA
} }
} }
// returns number of waiters waiting for a resource token blocked on condition variable
func (a *resourceTracker) GetResourceTokenWaiterCount() uint64 {
var waiters uint64
a.cond.L.Lock()
waiters = a.tokenWaiterCount
a.cond.L.Unlock()
return waiters
}
func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync bool) ResourceToken { func (a *resourceTracker) allocResourcesLocked(memory, cpuQuota uint64, isAsync bool) ResourceToken {
var asyncMem, syncMem uint64 var asyncMem, syncMem uint64
@@ -271,9 +255,7 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
isWaiting = true isWaiting = true
for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil { for !a.isResourceAvailableLocked(memory, cpuQuota, isAsync) && ctx.Err() == nil {
a.tokenWaiterCount++
c.Wait() c.Wait()
a.tokenWaiterCount--
} }
isWaiting = false isWaiting = false

View File

@@ -243,14 +243,7 @@ func (a *slotQueue) exitContainerState(conType ContainerStateType) {
// getSlot must ensure that if it receives a slot, it will be returned, otherwise // getSlot must ensure that if it receives a slot, it will be returned, otherwise
// a container will be locked up forever waiting for slot to free. // a container will be locked up forever waiting for slot to free.
func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) { func (a *slotQueueMgr) getSlotQueue(key string) (*slotQueue, bool) {
var key string
if call.slotHashId != "" {
key = call.slotHashId
} else {
key = getSlotQueueKey(call)
}
a.hMu.Lock() a.hMu.Lock()
slots, ok := a.hot[key] slots, ok := a.hot[key]

View File

@@ -31,7 +31,7 @@ docker run -e VAR_NAME=VALUE ...
| `FN_API_CORS_ORIGINS` | A comma separated list of URLs to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for (or `*` for all domains). This corresponds to the allowed origins in the `Acccess-Control-Allow-Origin` header. | None | | `FN_API_CORS_ORIGINS` | A comma separated list of URLs to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for (or `*` for all domains). This corresponds to the allowed origins in the `Acccess-Control-Allow-Origin` header. | None |
| `FN_API_CORS_HEADERS` | A comma separated list of Headers to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for. This corresponds to the allowed headers in the `Access-Control-Allow-Headers` header. | Origin,Content-Length,Content-Type | | `FN_API_CORS_HEADERS` | A comma separated list of Headers to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for. This corresponds to the allowed headers in the `Access-Control-Allow-Headers` header. | Origin,Content-Length,Content-Type |
| `FN_FREEZE_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before pausing/freezing an idle hot container. Set to 0 to freeze idle containers without any delay. Set to negative integer to disable freeze/pause of idle hot containers. | 50 | | `FN_FREEZE_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before pausing/freezing an idle hot container. Set to 0 to freeze idle containers without any delay. Set to negative integer to disable freeze/pause of idle hot containers. | 50 |
| `FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time in milliseconds to periodically check to terminate an idle hot container if the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 | | `FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time in milliseconds for an idle hot container to become eligible for eviction if the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 |
| `FN_MAX_RESPONSE_SIZE` | Set this option to specify the http body or json response size in bytes from the containers. | 0 (off) | | `FN_MAX_RESPONSE_SIZE` | Set this option to specify the http body or json response size in bytes from the containers. | 0 (off) |
| `DOCKER_HOST` | Docker remote API URL. | /var/run/docker.sock | | `DOCKER_HOST` | Docker remote API URL. | /var/run/docker.sock |
| `DOCKER_API_VERSION` | Docker remote API version. | 1.24 | | `DOCKER_API_VERSION` | Docker remote API version. | 1.24 |