mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Slot mgr fixes (#613)
*) during shutdown, errors should be 503 *) new inactivity time out for hot queue, we previously kept hot queues in memory forever. *) each hot queue now has a hot launcher to monitor and launch hot containers *) consumers now create a consumer channel with startDequeuer() that can be cancelled via context *) consumers now ping (signal) hot launcher every 200 msecs until they get a slot *) tests for slot queue & mgr
This commit is contained in:
@@ -2,7 +2,6 @@ package agent
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -188,7 +187,7 @@ func (a *agent) Submit(callI Call) error {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-a.shutdown:
|
case <-a.shutdown:
|
||||||
return errors.New("agent shut down")
|
return models.ErrCallTimeoutServerBusy
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -266,74 +265,103 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
|
|||||||
|
|
||||||
isHot := protocol.IsStreamable(protocol.Protocol(call.Format))
|
isHot := protocol.IsStreamable(protocol.Protocol(call.Format))
|
||||||
if isHot {
|
if isHot {
|
||||||
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
|
|
||||||
call.slots = a.slotMgr.getHotSlotQueue(call)
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
call.slots.enterState(SlotQueueWaiter)
|
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
|
||||||
s, err := a.launchHot(ctx, call)
|
var isNew bool
|
||||||
call.slots.exitStateWithLatency(SlotQueueWaiter, uint64(time.Now().Sub(start).Seconds()*1000))
|
call.slots, isNew = a.slotMgr.getSlotQueue(call)
|
||||||
|
if isNew {
|
||||||
|
go a.hotLauncher(ctx, call)
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := a.waitHot(ctx, call)
|
||||||
|
call.slots.exitStateWithLatency(SlotQueueWaiter, uint64(time.Now().Sub(start).Seconds()*1000))
|
||||||
return s, err
|
return s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return a.launchCold(ctx, call)
|
return a.launchCold(ctx, call)
|
||||||
}
|
}
|
||||||
|
|
||||||
// launchHot checks with slot queue to see if a new container needs to be launched and waits
|
// hotLauncher is spawned in a go routine for each slot queue to monitor stats and launch hot
|
||||||
// for available slots in the queue for hot request execution.
|
// containers if needed. Upon shutdown or activity timeout, hotLauncher exits and during exit,
|
||||||
func (a *agent) launchHot(ctx context.Context, call *call) (Slot, error) {
|
// it destroys the slot queue.
|
||||||
|
func (a *agent) hotLauncher(ctx context.Context, callObj *call) {
|
||||||
|
|
||||||
isAsync := call.Type == models.TypeAsync
|
// Let use 60 minutes or 2 * IdleTimeout as hot queue idle timeout, pick
|
||||||
|
// whichever is longer. If in this time, there's no activity, then
|
||||||
|
// we destroy the hot queue.
|
||||||
|
timeout := time.Duration(60) * time.Minute
|
||||||
|
idleTimeout := time.Duration(callObj.IdleTimeout) * time.Second * 2
|
||||||
|
if timeout < idleTimeout {
|
||||||
|
timeout = idleTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := common.Logger(ctx)
|
||||||
|
logger.WithField("launcher_timeout", timeout).Info("Hot function launcher starting")
|
||||||
|
isAsync := callObj.Type == models.TypeAsync
|
||||||
|
|
||||||
launchLoop:
|
|
||||||
for {
|
for {
|
||||||
// Check/evaluate if we need to launch a new hot container
|
select {
|
||||||
doLaunch, stats := call.slots.isNewContainerNeeded()
|
case <-a.shutdown: // server shutdown
|
||||||
common.Logger(ctx).WithField("stats", stats).Debug("checking hot container launch ", doLaunch)
|
return
|
||||||
|
case <-time.After(timeout):
|
||||||
if doLaunch {
|
if a.slotMgr.deleteSlotQueue(callObj.slots) {
|
||||||
ctxToken, tokenCancel := context.WithCancel(context.Background())
|
logger.Info("Hot function launcher timed out")
|
||||||
|
return
|
||||||
// wait on token/slot/timeout whichever comes first
|
|
||||||
select {
|
|
||||||
case tok, isOpen := <-a.resources.GetResourceToken(ctxToken, call.Memory, isAsync):
|
|
||||||
tokenCancel()
|
|
||||||
if !isOpen {
|
|
||||||
return nil, models.ErrCallTimeoutServerBusy
|
|
||||||
}
|
|
||||||
|
|
||||||
a.wg.Add(1)
|
|
||||||
go a.runHot(ctx, call, tok)
|
|
||||||
case s, ok := <-call.slots.getDequeueChan():
|
|
||||||
tokenCancel()
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("slot shut down while waiting for hot slot")
|
|
||||||
}
|
|
||||||
if s.acquireSlot() {
|
|
||||||
if s.slot.Error() != nil {
|
|
||||||
s.slot.Close()
|
|
||||||
return nil, s.slot.Error()
|
|
||||||
}
|
|
||||||
return s.slot, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// we failed to take ownership of the token (eg. container idle timeout)
|
|
||||||
// try launching again
|
|
||||||
continue launchLoop
|
|
||||||
case <-ctx.Done():
|
|
||||||
tokenCancel()
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
}
|
||||||
|
case <-callObj.slots.signaller:
|
||||||
}
|
}
|
||||||
|
|
||||||
// After launching (if it was necessary) a container, now wait for slot/timeout
|
isNeeded, stats := callObj.slots.isNewContainerNeeded()
|
||||||
// or periodically reevaluate the launchHot() logic from beginning.
|
logger.WithField("stats", stats).Debug("Hot function launcher stats")
|
||||||
|
if !isNeeded {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceCtx, cancel := context.WithCancel(context.Background())
|
||||||
|
logger.WithField("stats", stats).Info("Hot function launcher starting hot container")
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case s, ok := <-call.slots.getDequeueChan():
|
case tok, isOpen := <-a.resources.GetResourceToken(resourceCtx, callObj.Memory, isAsync):
|
||||||
if !ok {
|
cancel()
|
||||||
return nil, errors.New("slot shut down while waiting for hot slot")
|
if isOpen {
|
||||||
|
a.wg.Add(1)
|
||||||
|
go func(ctx context.Context, call *call, tok ResourceToken) {
|
||||||
|
a.runHot(ctx, call, tok)
|
||||||
|
a.wg.Done()
|
||||||
|
}(ctx, callObj, tok)
|
||||||
|
} else {
|
||||||
|
// this means the resource was impossible to reserve (eg. memory size we can never satisfy)
|
||||||
|
callObj.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: models.ErrCallTimeoutServerBusy})
|
||||||
}
|
}
|
||||||
|
case <-time.After(timeout):
|
||||||
|
cancel()
|
||||||
|
if a.slotMgr.deleteSlotQueue(callObj.slots) {
|
||||||
|
logger.Info("Hot function launcher timed out")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-a.shutdown: // server shutdown
|
||||||
|
cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitHot pings and waits for a hot container from the slot queue
|
||||||
|
func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
|
||||||
|
|
||||||
|
ch, cancel := call.slots.startDequeuer(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for {
|
||||||
|
// send a notification to launcHot()
|
||||||
|
select {
|
||||||
|
case call.slots.signaller <- true:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case s := <-ch:
|
||||||
if s.acquireSlot() {
|
if s.acquireSlot() {
|
||||||
if s.slot.Error() != nil {
|
if s.slot.Error() != nil {
|
||||||
s.slot.Close()
|
s.slot.Close()
|
||||||
@@ -341,13 +369,13 @@ launchLoop:
|
|||||||
}
|
}
|
||||||
return s.slot, nil
|
return s.slot, nil
|
||||||
}
|
}
|
||||||
|
// we failed to take ownership of the token (eg. container idle timeout) => try again
|
||||||
// we failed to take ownership of the token (eg. container idle timeout)
|
|
||||||
// try launching again
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
case <-time.After(time.Duration(200) * time.Millisecond):
|
case <-time.After(time.Duration(200) * time.Millisecond):
|
||||||
// reevaluate
|
// ping dequeuer again
|
||||||
|
case <-a.shutdown: // server shutdown
|
||||||
|
return nil, models.ErrCallTimeoutServerBusy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -506,7 +534,6 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
|
|||||||
|
|
||||||
func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) {
|
func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) {
|
||||||
// We must be careful to only use ctxArg for logs/spans
|
// We must be careful to only use ctxArg for logs/spans
|
||||||
defer a.wg.Done()
|
|
||||||
|
|
||||||
// create a span from ctxArg but ignore the new Context
|
// create a span from ctxArg but ignore the new Context
|
||||||
// instead we will create a new Context below and explicitly set its span
|
// instead we will create a new Context below and explicitly set its span
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ type Slot interface {
|
|||||||
|
|
||||||
// slotQueueMgr manages hot container slotQueues
|
// slotQueueMgr manages hot container slotQueues
|
||||||
type slotQueueMgr struct {
|
type slotQueueMgr struct {
|
||||||
hMu sync.RWMutex // protects hot
|
hMu sync.Mutex // protects hot
|
||||||
hot map[string]*slotQueue
|
hot map[string]*slotQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,8 +57,7 @@ type slotQueue struct {
|
|||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
slots []*slotToken
|
slots []*slotToken
|
||||||
nextId uint64
|
nextId uint64
|
||||||
output chan *slotToken
|
signaller chan bool
|
||||||
isClosed bool
|
|
||||||
statsLock sync.Mutex // protects stats below
|
statsLock sync.Mutex // protects stats below
|
||||||
stats slotQueueStats
|
stats slotQueueStats
|
||||||
}
|
}
|
||||||
@@ -72,49 +71,12 @@ func NewSlotQueueMgr() *slotQueueMgr {
|
|||||||
|
|
||||||
func NewSlotQueue(key string) *slotQueue {
|
func NewSlotQueue(key string) *slotQueue {
|
||||||
obj := &slotQueue{
|
obj := &slotQueue{
|
||||||
key: key,
|
key: key,
|
||||||
cond: sync.NewCond(new(sync.Mutex)),
|
cond: sync.NewCond(new(sync.Mutex)),
|
||||||
slots: make([]*slotToken, 0),
|
slots: make([]*slotToken, 0),
|
||||||
output: make(chan *slotToken),
|
signaller: make(chan bool, 1),
|
||||||
}
|
}
|
||||||
|
|
||||||
// producer go routine to pick LIFO slots and
|
|
||||||
// push them into output channel
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
obj.cond.L.Lock()
|
|
||||||
for len(obj.slots) <= 0 && !obj.isClosed {
|
|
||||||
obj.cond.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// cleanup and exit
|
|
||||||
if obj.isClosed {
|
|
||||||
|
|
||||||
purge := obj.slots
|
|
||||||
obj.slots = obj.slots[:0]
|
|
||||||
obj.cond.L.Unlock()
|
|
||||||
|
|
||||||
close(obj.output)
|
|
||||||
|
|
||||||
for _, val := range purge {
|
|
||||||
if val.acquireSlot() {
|
|
||||||
val.slot.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// pop
|
|
||||||
item := obj.slots[len(obj.slots)-1]
|
|
||||||
obj.slots = obj.slots[:len(obj.slots)-1]
|
|
||||||
obj.cond.L.Unlock()
|
|
||||||
|
|
||||||
// block
|
|
||||||
obj.output <- item
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -135,19 +97,13 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
isFound := false
|
|
||||||
|
|
||||||
a.cond.L.Lock()
|
a.cond.L.Lock()
|
||||||
for idx, val := range a.slots {
|
for i := 0; i < len(a.slots); i++ {
|
||||||
if val.id == s.id {
|
if a.slots[i].id == s.id {
|
||||||
a.slots[0], a.slots[idx] = a.slots[idx], a.slots[0]
|
a.slots = append(a.slots[:i], a.slots[i+1:]...)
|
||||||
isFound = true
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if isFound {
|
|
||||||
a.slots = a.slots[1:]
|
|
||||||
}
|
|
||||||
a.cond.L.Unlock()
|
a.cond.L.Unlock()
|
||||||
|
|
||||||
s.slot.Close()
|
s.slot.Close()
|
||||||
@@ -156,44 +112,73 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *slotQueue) destroySlotQueue() {
|
func (a *slotQueue) startDequeuer(ctx context.Context) (chan *slotToken, context.CancelFunc) {
|
||||||
doSignal := false
|
|
||||||
a.cond.L.Lock()
|
|
||||||
if !a.isClosed {
|
|
||||||
a.isClosed = true
|
|
||||||
doSignal = true
|
|
||||||
}
|
|
||||||
a.cond.L.Unlock()
|
|
||||||
if doSignal {
|
|
||||||
a.cond.Signal()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *slotQueue) getDequeueChan() chan *slotToken {
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
return a.output
|
|
||||||
|
myCancel := func() {
|
||||||
|
cancel()
|
||||||
|
a.cond.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
output := make(chan *slotToken)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
a.cond.L.Lock()
|
||||||
|
for len(a.slots) <= 0 && (ctx.Err() == nil) {
|
||||||
|
a.cond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
a.cond.L.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// pop
|
||||||
|
item := a.slots[len(a.slots)-1]
|
||||||
|
a.slots = a.slots[:len(a.slots)-1]
|
||||||
|
a.cond.L.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case output <- item: // good case (dequeued)
|
||||||
|
case <-item.trigger: // ejected (eject handles cleanup)
|
||||||
|
case <-ctx.Done(): // time out or cancel from caller
|
||||||
|
// consume slot, we let the hot container queue the slot again
|
||||||
|
if item.acquireSlot() {
|
||||||
|
item.slot.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return output, myCancel
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *slotQueue) queueSlot(slot Slot) *slotToken {
|
func (a *slotQueue) queueSlot(slot Slot) *slotToken {
|
||||||
|
|
||||||
token := &slotToken{slot, make(chan struct{}), 0, 0}
|
token := &slotToken{slot, make(chan struct{}), 0, 0}
|
||||||
isClosed := false
|
|
||||||
|
|
||||||
a.cond.L.Lock()
|
a.cond.L.Lock()
|
||||||
if !a.isClosed {
|
token.id = a.nextId
|
||||||
token.id = a.nextId
|
a.slots = append(a.slots, token)
|
||||||
a.slots = append(a.slots, token)
|
a.nextId += 1
|
||||||
a.nextId += 1
|
|
||||||
} else {
|
|
||||||
isClosed = true
|
|
||||||
}
|
|
||||||
a.cond.L.Unlock()
|
a.cond.L.Unlock()
|
||||||
|
|
||||||
if !isClosed {
|
a.cond.Broadcast()
|
||||||
a.cond.Signal()
|
return token
|
||||||
return token
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
// isIdle() returns true is there's no activity for this slot queue. This
|
||||||
|
// means no one is waiting, running or starting.
|
||||||
|
func (a *slotQueue) isIdle() bool {
|
||||||
|
var partySize uint64
|
||||||
|
|
||||||
|
a.statsLock.Lock()
|
||||||
|
partySize = a.stats.states[SlotQueueWaiter] + a.stats.states[SlotQueueStarter] + a.stats.states[SlotQueueRunner]
|
||||||
|
a.statsLock.Unlock()
|
||||||
|
|
||||||
|
return partySize == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *slotQueue) getStats() slotQueueStats {
|
func (a *slotQueue) getStats() slotQueueStats {
|
||||||
@@ -296,32 +281,35 @@ func (a *slotQueue) exitStateWithLatency(metricIdx SlotQueueMetricType, latency
|
|||||||
|
|
||||||
// getSlot must ensure that if it receives a slot, it will be returned, otherwise
|
// getSlot must ensure that if it receives a slot, it will be returned, otherwise
|
||||||
// a container will be locked up forever waiting for slot to free.
|
// a container will be locked up forever waiting for slot to free.
|
||||||
func (a *slotQueueMgr) getHotSlotQueue(call *call) *slotQueue {
|
func (a *slotQueueMgr) getSlotQueue(call *call) (*slotQueue, bool) {
|
||||||
|
|
||||||
key := getSlotQueueKey(call)
|
key := getSlotQueueKey(call)
|
||||||
|
|
||||||
a.hMu.RLock()
|
a.hMu.Lock()
|
||||||
slots, ok := a.hot[key]
|
slots, ok := a.hot[key]
|
||||||
a.hMu.RUnlock()
|
|
||||||
if !ok {
|
if !ok {
|
||||||
a.hMu.Lock()
|
slots = NewSlotQueue(key)
|
||||||
slots, ok = a.hot[key]
|
a.hot[key] = slots
|
||||||
if !ok {
|
|
||||||
slots = NewSlotQueue(key)
|
|
||||||
a.hot[key] = slots
|
|
||||||
}
|
|
||||||
a.hMu.Unlock()
|
|
||||||
}
|
}
|
||||||
return slots
|
slots.enterState(SlotQueueWaiter)
|
||||||
|
a.hMu.Unlock()
|
||||||
|
|
||||||
|
return slots, !ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// currently unused. But at some point, we need to age/delete old
|
// currently unused. But at some point, we need to age/delete old
|
||||||
// slotQueues.
|
// slotQueues.
|
||||||
func (a *slotQueueMgr) destroySlotQueue(slots *slotQueue) {
|
func (a *slotQueueMgr) deleteSlotQueue(slots *slotQueue) bool {
|
||||||
slots.destroySlotQueue()
|
isDeleted := false
|
||||||
|
|
||||||
a.hMu.Lock()
|
a.hMu.Lock()
|
||||||
delete(a.hot, slots.key)
|
if slots.isIdle() {
|
||||||
|
delete(a.hot, slots.key)
|
||||||
|
isDeleted = true
|
||||||
|
}
|
||||||
a.hMu.Unlock()
|
a.hMu.Unlock()
|
||||||
|
|
||||||
|
return isDeleted
|
||||||
}
|
}
|
||||||
|
|
||||||
func getSlotQueueKey(call *call) string {
|
func getSlotQueueKey(call *call) string {
|
||||||
|
|||||||
280
api/agent/slots_test.go
Normal file
280
api/agent/slots_test.go
Normal file
@@ -0,0 +1,280 @@
|
|||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testSlot struct {
|
||||||
|
id uint64
|
||||||
|
err error
|
||||||
|
isClosed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *testSlot) exec(ctx context.Context, call *call) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *testSlot) Close() error {
|
||||||
|
if a.isClosed {
|
||||||
|
panic(fmt.Errorf("id=%d already closed %v", a.id, a))
|
||||||
|
}
|
||||||
|
a.isClosed = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *testSlot) Error() error {
|
||||||
|
return a.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestSlot(id uint64) Slot {
|
||||||
|
mySlot := &testSlot{
|
||||||
|
id: id,
|
||||||
|
}
|
||||||
|
return mySlot
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlotQueueBasic1(t *testing.T) {
|
||||||
|
|
||||||
|
maxId := uint64(10)
|
||||||
|
slotName := "test1"
|
||||||
|
|
||||||
|
slots := make([]Slot, 0, maxId)
|
||||||
|
tokens := make([]*slotToken, 0, maxId)
|
||||||
|
|
||||||
|
obj := NewSlotQueue(slotName)
|
||||||
|
|
||||||
|
outChan, cancel := obj.startDequeuer(context.Background())
|
||||||
|
select {
|
||||||
|
case z := <-outChan:
|
||||||
|
t.Fatalf("Should not get anything from queue: %#v", z)
|
||||||
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// create slots
|
||||||
|
for id := uint64(0); id < maxId; id += 1 {
|
||||||
|
slots = append(slots, NewTestSlot(id))
|
||||||
|
}
|
||||||
|
|
||||||
|
// queue a few slots here
|
||||||
|
for id := uint64(0); id < maxId; id += 1 {
|
||||||
|
tok := obj.queueSlot(slots[id])
|
||||||
|
|
||||||
|
innerTok := tok.slot.(*testSlot)
|
||||||
|
|
||||||
|
// check for slot id match
|
||||||
|
if innerTok != slots[id] {
|
||||||
|
t.Fatalf("queued testSlot does not match with slotToken.slot %#v vs %#v", innerTok, slots[id])
|
||||||
|
}
|
||||||
|
|
||||||
|
tokens = append(tokens, tok)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now according to LIFO semantics, we should get 9,8,7,6,5,4,3,2,1,0 if we dequeued right now.
|
||||||
|
// but let's eject 9
|
||||||
|
if !obj.ejectSlot(tokens[9]) {
|
||||||
|
t.Fatalf("Cannot eject slotToken: %#v", tokens[9])
|
||||||
|
}
|
||||||
|
// let eject 0
|
||||||
|
if !obj.ejectSlot(tokens[0]) {
|
||||||
|
t.Fatalf("Cannot eject slotToken: %#v", tokens[0])
|
||||||
|
}
|
||||||
|
// let eject 5
|
||||||
|
if !obj.ejectSlot(tokens[5]) {
|
||||||
|
t.Fatalf("Cannot eject slotToken: %#v", tokens[5])
|
||||||
|
}
|
||||||
|
// try ejecting 5 again, it should fail
|
||||||
|
if obj.ejectSlot(tokens[5]) {
|
||||||
|
t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5])
|
||||||
|
}
|
||||||
|
|
||||||
|
outChan, cancel = obj.startDequeuer(context.Background())
|
||||||
|
|
||||||
|
// now we should get 8
|
||||||
|
select {
|
||||||
|
case z := <-outChan:
|
||||||
|
if z.id != 8 {
|
||||||
|
t.Fatalf("Bad slotToken received: %#v", z)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !z.acquireSlot() {
|
||||||
|
t.Fatalf("Cannot acquire slotToken received: %#v", z)
|
||||||
|
}
|
||||||
|
|
||||||
|
// second acquire shoudl fail
|
||||||
|
if z.acquireSlot() {
|
||||||
|
t.Fatalf("Should not be able to acquire twice slotToken: %#v", z)
|
||||||
|
}
|
||||||
|
|
||||||
|
z.slot.Close()
|
||||||
|
|
||||||
|
case <-time.After(time.Duration(1) * time.Second):
|
||||||
|
t.Fatal("timeout in waiting slotToken")
|
||||||
|
}
|
||||||
|
|
||||||
|
// now we should get 7
|
||||||
|
select {
|
||||||
|
case z := <-outChan:
|
||||||
|
if z.id != 7 {
|
||||||
|
t.Fatalf("Bad slotToken received: %#v", z)
|
||||||
|
}
|
||||||
|
|
||||||
|
// eject it before we can consume
|
||||||
|
if !obj.ejectSlot(tokens[7]) {
|
||||||
|
t.Fatalf("Cannot eject slotToken: %#v", tokens[2])
|
||||||
|
}
|
||||||
|
|
||||||
|
// we shouldn't be able to consume an ejected slotToken
|
||||||
|
if z.acquireSlot() {
|
||||||
|
t.Fatalf("We should not be able to acquire slotToken received: %#v", z)
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-time.After(time.Duration(1) * time.Second):
|
||||||
|
t.Fatal("timeout in waiting slotToken")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// we should get nothing or 6
|
||||||
|
select {
|
||||||
|
case z, ok := <-outChan:
|
||||||
|
if ok {
|
||||||
|
if z.id != 6 {
|
||||||
|
t.Fatalf("Should not get anything except for 6 from queue: %#v", z)
|
||||||
|
}
|
||||||
|
if !z.acquireSlot() {
|
||||||
|
t.Fatalf("cannot acquire token: %#v", z)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
stats1 := obj.getStats()
|
||||||
|
isNeeded, stats2 := obj.isNewContainerNeeded()
|
||||||
|
|
||||||
|
if stats1 != stats2 {
|
||||||
|
t.Fatalf("Faulty stats %#v != %#v", stats1, stats2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// there are no waiters.
|
||||||
|
if isNeeded {
|
||||||
|
t.Fatalf("Shouldn't need a container")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlotQueueBasic2(t *testing.T) {
|
||||||
|
|
||||||
|
obj := NewSlotQueue("test2")
|
||||||
|
|
||||||
|
if !obj.isIdle() {
|
||||||
|
t.Fatalf("Should be idle")
|
||||||
|
}
|
||||||
|
if ok, _ := obj.isNewContainerNeeded(); ok {
|
||||||
|
t.Fatalf("Should not need a new container")
|
||||||
|
}
|
||||||
|
|
||||||
|
outChan, cancel := obj.startDequeuer(context.Background())
|
||||||
|
select {
|
||||||
|
case z := <-outChan:
|
||||||
|
t.Fatalf("Should not get anything from queue: %#v", z)
|
||||||
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlotQueueBasic3(t *testing.T) {
|
||||||
|
|
||||||
|
slotName := "test3"
|
||||||
|
|
||||||
|
obj := NewSlotQueue(slotName)
|
||||||
|
_, cancel1 := obj.startDequeuer(context.Background())
|
||||||
|
|
||||||
|
slot1 := NewTestSlot(1)
|
||||||
|
slot2 := NewTestSlot(2)
|
||||||
|
token1 := obj.queueSlot(slot1)
|
||||||
|
obj.queueSlot(slot2)
|
||||||
|
|
||||||
|
// now our slot must be ready in outChan, but let's cancel it
|
||||||
|
// to cause a requeue. This should cause [1, 2] ordering to [2, 1]
|
||||||
|
cancel1()
|
||||||
|
|
||||||
|
outChan, cancel2 := obj.startDequeuer(context.Background())
|
||||||
|
|
||||||
|
// we should get '2' since cancel1() reordered the queue
|
||||||
|
select {
|
||||||
|
case item, ok := <-outChan:
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("outChan should be open")
|
||||||
|
}
|
||||||
|
|
||||||
|
inner := item.slot.(*testSlot)
|
||||||
|
outer := slot2.(*testSlot)
|
||||||
|
|
||||||
|
if inner.id != outer.id {
|
||||||
|
t.Fatalf("item should be 2")
|
||||||
|
}
|
||||||
|
if inner.isClosed {
|
||||||
|
t.Fatalf("2 should not yet be closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !item.acquireSlot() {
|
||||||
|
t.Fatalf("2 acquire should not fail")
|
||||||
|
}
|
||||||
|
|
||||||
|
item.slot.Close()
|
||||||
|
|
||||||
|
case <-time.After(time.Duration(1) * time.Second):
|
||||||
|
t.Fatal("timeout in waiting slotToken")
|
||||||
|
}
|
||||||
|
|
||||||
|
// let's eject 1
|
||||||
|
if !obj.ejectSlot(token1) {
|
||||||
|
t.Fatalf("failed to eject 1")
|
||||||
|
}
|
||||||
|
if !slot1.(*testSlot).isClosed {
|
||||||
|
t.Fatalf("1 should be closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// spin up bunch of go routines, where each should get a non-acquirable
|
||||||
|
// token or timeout due the imminent obj.destroySlotQueue()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
goMax := 10
|
||||||
|
wg.Add(goMax)
|
||||||
|
for i := 0; i < goMax; i += 1 {
|
||||||
|
go func(id int) {
|
||||||
|
ch, cancl := obj.startDequeuer(context.Background())
|
||||||
|
defer cancl()
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case z := <-ch:
|
||||||
|
t.Fatalf("%v we shouldn't get anything from queue %#v", id, z)
|
||||||
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// let's cancel after destroy this time
|
||||||
|
cancel2()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case z := <-outChan:
|
||||||
|
t.Fatalf("Should not get anything from queue: %#v", z)
|
||||||
|
case <-time.After(time.Duration(500) * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
// both should be closed
|
||||||
|
if !slot1.(*testSlot).isClosed {
|
||||||
|
t.Fatalf("item1 should be closed")
|
||||||
|
}
|
||||||
|
if !slot2.(*testSlot).isClosed {
|
||||||
|
t.Fatalf("item2 should be closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user