fn: hot container timer improvements (#751)

* fn: hot container timer improvements

With this change, now we are allocating the timers
when the container starts and managing them via
stop/clear as needed, which should not only be more
efficient, but also easier to follow.

For example, previously, if eject time out was
set to 10 secs, this could have delayed idle timeout
up to 10 secs as well. It is also not necessary to do
any math for elapsed time.

Now consumers avoid any requeuing when startDequeuer() is cancelled.
This was triggering additional dequeue/requeue causing
containers to wake up spuriously. Also in startDequeuer(),
we no longer remove the item from the actual queue and
leave this to acquire/eject, which side steps issues related
with item landing in the channel, not consumed, etc.
This commit is contained in:
Tolga Ceylan
2018-02-12 14:12:03 -08:00
committed by GitHub
parent ffcda9b823
commit c848fc6181
4 changed files with 191 additions and 241 deletions

View File

@@ -140,6 +140,11 @@ func New(da DataAccess) Agent {
if err != nil { if err != nil {
logrus.WithError(err).Fatal("error initializing eject idle delay") logrus.WithError(err).Fatal("error initializing eject idle delay")
} }
if ejectIdleMsecs == time.Duration(0) {
logrus.Fatal("eject idle delay cannot be zero")
}
logrus.WithFields(logrus.Fields{"eject_msec": ejectIdleMsecs, "free_msec": freezeIdleMsecs}).Info("agent starting")
a := &agent{ a := &agent{
da: da, da: da,
@@ -445,7 +450,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
for { for {
select { select {
case s := <-ch: case s := <-ch:
if s.acquireSlot() { if call.slots.acquireSlot(s) {
if s.slot.Error() != nil { if s.slot.Error() != nil {
s.slot.Close(ctx) s.slot.Close(ctx)
return nil, s.slot.Error() return nil, s.slot.Error()
@@ -553,9 +558,9 @@ func (s *coldSlot) Close(ctx context.Context) error {
// implements Slot // implements Slot
type hotSlot struct { type hotSlot struct {
done chan<- struct{} // signal we are done with slot done chan struct{} // signal we are done with slot
errC <-chan error // container error errC <-chan error // container error
container *container // TODO mask this container *container // TODO mask this
err error err error
} }
@@ -719,80 +724,13 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
default: // ok default: // ok
} }
isFrozen := false slot := &hotSlot{make(chan struct{}), errC, container, nil}
elapsed := time.Duration(0) if !a.runHotReq(ctx, call, state, logger, cookie, slot) {
freezerTicker := a.freezeIdleMsecs
idleTimeout := time.Duration(call.IdleTimeout) * time.Second
done := make(chan struct{})
state.UpdateState(ctx, ContainerStateIdle, call.slots)
s := call.slots.queueSlot(&hotSlot{done, errC, container, nil})
for {
select {
case <-s.trigger: // slot already consumed
case <-ctx.Done(): // container shutdown
case <-a.shutdown: // server shutdown
case <-time.After(idleTimeout): // in case idleTimeout < a.freezeIdleMsecs or idleTimeout < a.ejectIdleMsecs
case <-time.After(freezerTicker):
elapsed += a.freezeIdleMsecs
freezerTicker = math.MaxInt64 // do not fire again
if elapsed < idleTimeout { // in case idleTimeout <= a.freezeIdleMsecs
if !isFrozen {
err := cookie.Freeze(ctx)
if err != nil {
logger.WithError(err).Error("freeze error")
return
}
isFrozen = true
}
continue
}
case <-time.After(a.ejectIdleMsecs):
elapsed += a.ejectIdleMsecs
if elapsed < idleTimeout {
// if someone is waiting for resource in our slot queue, we must not terminate,
// otherwise, see if other slot queues have resource waiters that are blocked.
stats := call.slots.getStats()
if stats.containerStates[ContainerStateWait] > 0 ||
a.resources.GetResourceTokenWaiterCount() <= 0 {
continue
}
logger.Debug("attempting hot function eject")
}
}
break
}
// if we can eject token, that means we are here due to
// abort/shutdown/timeout, attempt to eject and terminate,
// otherwise continue processing the request
if call.slots.ejectSlot(ctx, s) {
return return
} }
if isFrozen {
err := cookie.Unfreeze(ctx)
if err != nil {
logger.WithError(err).Error("unfreeze error")
return
}
isFrozen = false
}
state.UpdateState(ctx, ContainerStateBusy, call.slots)
// IMPORTANT: if we fail to eject the slot, it means that a consumer
// just dequeued this and acquired the slot. In other words, we were
// late in ejectSlots(), so we have to execute this request in this
// iteration. Beginning of for-loop will re-check ctx/shutdown case
// and terminate after this request is done.
// wait for this call to finish // wait for this call to finish
// NOTE do NOT select with shutdown / other channels. slot handles this. // NOTE do NOT select with shutdown / other channels. slot handles this.
<-done <-slot.done
} }
}() }()
@@ -807,6 +745,91 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
logger.WithError(err).Info("hot function terminated") logger.WithError(err).Info("hot function terminated")
} }
// runHotReq enqueues a free slot to slot queue manager and watches various timers and the consumer until
// the slot is consumed. A return value of false means, the container should shutdown and no subsequent
// calls should be made to this function.
func (a *agent) runHotReq(ctx context.Context, call *call, state ContainerState, logger logrus.FieldLogger, cookie drivers.Cookie, slot *hotSlot) bool {
var err error
isFrozen := false
freezeTimer := time.NewTimer(a.freezeIdleMsecs)
idleTimer := time.NewTimer(time.Duration(call.IdleTimeout) * time.Second)
ejectTicker := time.NewTicker(a.ejectIdleMsecs)
defer freezeTimer.Stop()
defer idleTimer.Stop()
defer ejectTicker.Stop()
// log if any error is encountered
defer func() {
if err != nil {
logger.WithError(err).Error("hot function failure")
}
}()
// if an immediate freeze is requested, freeze first before enqueuing at all.
if a.freezeIdleMsecs == time.Duration(0) && !isFrozen {
err = cookie.Freeze(ctx)
if err != nil {
return false
}
isFrozen = true
}
state.UpdateState(ctx, ContainerStateIdle, call.slots)
s := call.slots.queueSlot(slot)
for {
select {
case <-s.trigger: // slot already consumed
case <-ctx.Done(): // container shutdown
case <-a.shutdown: // server shutdown
case <-idleTimer.C:
case <-freezeTimer.C:
if !isFrozen {
err = cookie.Freeze(ctx)
if err != nil {
return false
}
isFrozen = true
}
continue
case <-ejectTicker.C:
// if someone is waiting for resource in our slot queue, we must not terminate,
// otherwise, see if other slot queues have resource waiters that are blocked.
stats := call.slots.getStats()
if stats.containerStates[ContainerStateWait] > 0 ||
a.resources.GetResourceTokenWaiterCount() <= 0 {
continue
}
logger.Debug("attempting hot function eject")
}
break
}
// if we can acquire token, that means we are here due to
// abort/shutdown/timeout, attempt to acquire and terminate,
// otherwise continue processing the request
if call.slots.acquireSlot(s) {
slot.Close(ctx)
return false
}
// In case, timer/acquireSlot failure landed us here, make
// sure to unfreeze.
if isFrozen {
err = cookie.Unfreeze(ctx)
if err != nil {
return false
}
isFrozen = false
}
state.UpdateState(ctx, ContainerStateBusy, call.slots)
return true
}
// container implements drivers.ContainerTask container is the execution of a // container implements drivers.ContainerTask container is the execution of a
// single container, which may run multiple functions [consecutively]. the id // single container, which may run multiple functions [consecutively]. the id
// and stderr can be swapped out by new calls in the container. input and // and stderr can be swapped out by new calls in the container. input and

View File

@@ -70,25 +70,15 @@ func NewSlotQueue(key string) *slotQueue {
return obj return obj
} }
func (a *slotToken) acquireSlot() bool { func (a *slotQueue) acquireSlot(s *slotToken) bool {
// let's get the lock
if !atomic.CompareAndSwapUint32(&a.isBusy, 0, 1) {
return false
}
// now we have the lock, push the trigger
close(a.trigger)
return true
}
func (a *slotQueue) ejectSlot(ctx context.Context, s *slotToken) bool {
// let's get the lock // let's get the lock
if !atomic.CompareAndSwapUint32(&s.isBusy, 0, 1) { if !atomic.CompareAndSwapUint32(&s.isBusy, 0, 1) {
return false return false
} }
a.cond.L.Lock() a.cond.L.Lock()
for i := 0; i < len(a.slots); i++ { // common case: acquired slots are usually at the end
for i := len(a.slots) - 1; i >= 0; i-- {
if a.slots[i].id == s.id { if a.slots[i].id == s.id {
a.slots = append(a.slots[:i], a.slots[i+1:]...) a.slots = append(a.slots[:i], a.slots[i+1:]...)
break break
@@ -96,7 +86,6 @@ func (a *slotQueue) ejectSlot(ctx context.Context, s *slotToken) bool {
} }
a.cond.L.Unlock() a.cond.L.Unlock()
s.slot.Close(ctx)
// now we have the lock, push the trigger // now we have the lock, push the trigger
close(s.trigger) close(s.trigger)
return true return true
@@ -131,19 +120,13 @@ func (a *slotQueue) startDequeuer(ctx context.Context) chan *slotToken {
return return
} }
// pop
item := a.slots[len(a.slots)-1] item := a.slots[len(a.slots)-1]
a.slots = a.slots[:len(a.slots)-1]
a.cond.L.Unlock() a.cond.L.Unlock()
select { select {
case output <- item: // good case (dequeued) case output <- item: // good case (dequeued)
case <-item.trigger: // ejected (eject handles cleanup) case <-item.trigger: // ejected (eject handles cleanup)
case <-ctx.Done(): // time out or cancel from caller case <-ctx.Done(): // time out or cancel from caller
// consume slot, we let the hot container queue the slot again
if item.acquireSlot() {
item.slot.Close(ctx)
}
} }
} }
}() }()

View File

@@ -37,6 +37,33 @@ func NewTestSlot(id uint64) Slot {
return mySlot return mySlot
} }
func checkGetTokenId(t *testing.T, a *slotQueue, dur time.Duration, id uint64) error {
ctx, cancel := context.WithTimeout(context.Background(), dur)
defer cancel()
outChan := a.startDequeuer(ctx)
for {
select {
case z := <-outChan:
if !a.acquireSlot(z) {
continue
}
z.slot.Close(ctx)
if z.id != id {
return fmt.Errorf("Bad slotToken received: %#v expected: %d", z, id)
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
func TestSlotQueueBasic1(t *testing.T) { func TestSlotQueueBasic1(t *testing.T) {
maxId := uint64(10) maxId := uint64(10)
@@ -47,14 +74,14 @@ func TestSlotQueueBasic1(t *testing.T) {
obj := NewSlotQueue(slotName) obj := NewSlotQueue(slotName)
ctx, cancel := context.WithCancel(context.Background()) timeout := time.Duration(500) * time.Millisecond
outChan := obj.startDequeuer(ctx) err := checkGetTokenId(t, obj, timeout, 6)
select { if err == nil {
case z := <-outChan: t.Fatalf("Should not get anything from queue")
t.Fatalf("Should not get anything from queue: %#v", z) }
case <-time.After(time.Duration(500) * time.Millisecond): if err != context.DeadlineExceeded {
t.Fatalf(err.Error())
} }
cancel()
// create slots // create slots
for id := uint64(0); id < maxId; id += 1 { for id := uint64(0); id < maxId; id += 1 {
@@ -76,83 +103,36 @@ func TestSlotQueueBasic1(t *testing.T) {
} }
// Now according to LIFO semantics, we should get 9,8,7,6,5,4,3,2,1,0 if we dequeued right now. // Now according to LIFO semantics, we should get 9,8,7,6,5,4,3,2,1,0 if we dequeued right now.
// but let's eject 9 // but let's acquire 9
if !obj.ejectSlot(ctx, tokens[9]) { if !obj.acquireSlot(tokens[9]) {
t.Fatalf("Cannot eject slotToken: %#v", tokens[9]) t.Fatalf("Cannot acquire slotToken: %#v", tokens[9])
} }
// let eject 0 // let acquire 0
if !obj.ejectSlot(ctx, tokens[0]) { if !obj.acquireSlot(tokens[0]) {
t.Fatalf("Cannot eject slotToken: %#v", tokens[0]) t.Fatalf("Cannot acquire slotToken: %#v", tokens[0])
} }
// let eject 5 // let acquire 5
if !obj.ejectSlot(ctx, tokens[5]) { if !obj.acquireSlot(tokens[5]) {
t.Fatalf("Cannot eject slotToken: %#v", tokens[5]) t.Fatalf("Cannot acquire slotToken: %#v", tokens[5])
} }
// try ejecting 5 again, it should fail // try acquire 5 again, it should fail
if obj.ejectSlot(ctx, tokens[5]) { if obj.acquireSlot(tokens[5]) {
t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5]) t.Fatalf("Shouldn't be able to acquire slotToken: %#v", tokens[5])
} }
ctx, cancel = context.WithCancel(context.Background()) err = checkGetTokenId(t, obj, timeout, 8)
outChan = obj.startDequeuer(ctx) if err != nil {
t.Fatalf(err.Error())
// now we should get 8
select {
case z := <-outChan:
if z.id != 8 {
t.Fatalf("Bad slotToken received: %#v", z)
}
if !z.acquireSlot() {
t.Fatalf("Cannot acquire slotToken received: %#v", z)
}
// second acquire shoudl fail
if z.acquireSlot() {
t.Fatalf("Should not be able to acquire twice slotToken: %#v", z)
}
z.slot.Close(ctx)
case <-time.After(time.Duration(1) * time.Second):
t.Fatal("timeout in waiting slotToken")
} }
// now we should get 7 // acquire 7 before we can consume
select { if !obj.acquireSlot(tokens[7]) {
case z := <-outChan: t.Fatalf("Cannot acquire slotToken: %#v", tokens[2])
if z.id != 7 {
t.Fatalf("Bad slotToken received: %#v", z)
}
// eject it before we can consume
if !obj.ejectSlot(ctx, tokens[7]) {
t.Fatalf("Cannot eject slotToken: %#v", tokens[2])
}
// we shouldn't be able to consume an ejected slotToken
if z.acquireSlot() {
t.Fatalf("We should not be able to acquire slotToken received: %#v", z)
}
case <-time.After(time.Duration(1) * time.Second):
t.Fatal("timeout in waiting slotToken")
} }
cancel() err = checkGetTokenId(t, obj, timeout, 6)
if err != nil {
// we should get nothing or 6 t.Fatalf(err.Error())
select {
case z, ok := <-outChan:
if ok {
if z.id != 6 {
t.Fatalf("Should not get anything except for 6 from queue: %#v", z)
}
if !z.acquireSlot() {
t.Fatalf("cannot acquire token: %#v", z)
}
}
case <-time.After(time.Duration(500) * time.Millisecond):
} }
} }
@@ -164,13 +144,13 @@ func TestSlotQueueBasic2(t *testing.T) {
t.Fatalf("Should be idle") t.Fatalf("Should be idle")
} }
ctx, cancel := context.WithCancel(context.Background()) timeout := time.Duration(500) * time.Millisecond
defer cancel() err := checkGetTokenId(t, obj, timeout, 6)
if err == nil {
select { t.Fatalf("Should not get anything from queue")
case z := <-obj.startDequeuer(ctx): }
t.Fatalf("Should not get anything from queue: %#v", z) if err != context.DeadlineExceeded {
case <-time.After(time.Duration(500) * time.Millisecond): t.Fatalf(err.Error())
} }
} }
@@ -227,92 +207,56 @@ func TestSlotQueueBasic3(t *testing.T) {
slotName := "test3" slotName := "test3"
obj := NewSlotQueue(slotName) obj := NewSlotQueue(slotName)
ctx, cancel := context.WithCancel(context.Background())
obj.startDequeuer(ctx)
slot1 := NewTestSlot(1) slot1 := NewTestSlot(1)
slot2 := NewTestSlot(2) slot2 := NewTestSlot(2)
token1 := obj.queueSlot(slot1) token1 := obj.queueSlot(slot1)
obj.queueSlot(slot2) obj.queueSlot(slot2)
// now our slot must be ready in outChan, but let's cancel it timeout := time.Duration(500) * time.Millisecond
// to cause a requeue. This should cause [1, 2] ordering to [2, 1] err := checkGetTokenId(t, obj, timeout, 1)
cancel() if err != nil {
t.Fatalf(err.Error())
ctx, cancel = context.WithCancel(context.Background())
outChan := obj.startDequeuer(ctx)
// we should get '2' since cancel1() reordered the queue
select {
case item, ok := <-outChan:
if !ok {
t.Fatalf("outChan should be open")
}
inner := item.slot.(*testSlot)
outer := slot2.(*testSlot)
if inner.id != outer.id {
t.Fatalf("item should be 2")
}
if inner.isClosed {
t.Fatalf("2 should not yet be closed")
}
if !item.acquireSlot() {
t.Fatalf("2 acquire should not fail")
}
item.slot.Close(ctx)
case <-time.After(time.Duration(1) * time.Second):
t.Fatal("timeout in waiting slotToken")
} }
// let's eject 1 // let's acquire 1
if !obj.ejectSlot(ctx, token1) { if !obj.acquireSlot(token1) {
t.Fatalf("failed to eject 1") t.Fatalf("should fail to acquire %#v", token1)
}
if !slot1.(*testSlot).isClosed {
t.Fatalf("1 should be closed")
} }
// spin up bunch of go routines, where each should get a non-acquirable
// token or timeout due the imminent obj.destroySlotQueue()
var wg sync.WaitGroup
goMax := 10 goMax := 10
out := make(chan error, goMax)
var wg sync.WaitGroup
wg.Add(goMax) wg.Add(goMax)
for i := 0; i < goMax; i += 1 { for i := 0; i < goMax; i += 1 {
go func(id int) { go func(id int) {
defer wg.Done() defer wg.Done()
err := checkGetTokenId(t, obj, timeout, 1)
ctx, cancel = context.WithCancel(context.Background()) out <- err
defer cancel()
select {
case z := <-obj.startDequeuer(ctx):
t.Fatalf("%v we shouldn't get anything from queue %#v", id, z)
case <-time.After(time.Duration(500) * time.Millisecond):
}
}(i) }(i)
} }
// let's cancel after destroy this time
cancel()
wg.Wait() wg.Wait()
select { deadlineErrors := 0
case z := <-outChan: for i := 0; i < goMax; i += 1 {
t.Fatalf("Should not get anything from queue: %#v", z) err := <-out
case <-time.After(time.Duration(500) * time.Millisecond): if err == context.DeadlineExceeded {
deadlineErrors++
} else if err == nil {
t.Fatalf("Unexpected success")
} else {
t.Fatalf("Unexpected error: %s", err.Error())
}
} }
// both should be closed if deadlineErrors != goMax {
if !slot1.(*testSlot).isClosed { t.Fatalf("Expected %d got %d deadline exceeded errors", goMax, deadlineErrors)
t.Fatalf("item1 should be closed")
} }
if !slot2.(*testSlot).isClosed {
t.Fatalf("item2 should be closed") err = checkGetTokenId(t, obj, timeout, 2)
if err != context.DeadlineExceeded {
t.Fatalf(err.Error())
} }
} }

View File

@@ -30,7 +30,7 @@ docker run -e VAR_NAME=VALUE ...
| `FN_LOG_PREFIX` | If supplying a syslog url in `FN_LOG_DEST`, a prefix to add to each log line | `FN_LOG_PREFIX` | If supplying a syslog url in `FN_LOG_DEST`, a prefix to add to each log line
| `FN_API_CORS` | A comma separated list of URLs to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for (or `*` for all domains). This corresponds to the allowed origins in the `Acccess-Control-Allow-Origin` header. | None | | `FN_API_CORS` | A comma separated list of URLs to enable [CORS](https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS) for (or `*` for all domains). This corresponds to the allowed origins in the `Acccess-Control-Allow-Origin` header. | None |
| `FN_FREEZE_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before pausing/freezing an idle hot container. Set to 0 to freeze idle containers without any delay. Set to negative integer to disable freeze/pause of idle hot containers. | 50 | | `FN_FREEZE_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before pausing/freezing an idle hot container. Set to 0 to freeze idle containers without any delay. Set to negative integer to disable freeze/pause of idle hot containers. | 50 |
`FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time to wait in milliseconds before attempting to terminate an idle hot container when the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 | `FN_EJECT_IDLE_MSECS` | Set this option to specify the amount of time in milliseconds to periodically check to terminate an idle hot container if the system is starved for CPU and Memory resources. Set to negative integer to disable this feature. | 1000 |
| `DOCKER_HOST` | Docker remote API URL. | /var/run/docker.sock | | `DOCKER_HOST` | Docker remote API URL. | /var/run/docker.sock |
| `DOCKER_API_VERSION` | Docker remote API version. | 1.24 | | `DOCKER_API_VERSION` | Docker remote API version. | 1.24 |
| `DOCKER_TLS_VERIFY` | Set this option to enable/disable Docker remote API over TLS/SSL. | 0 | | `DOCKER_TLS_VERIFY` | Set this option to enable/disable Docker remote API over TLS/SSL. | 0 |