mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
plumb ctx for container removal spanno (#750)
these were just dangling off on the side, took some plumbing work but not so bad
This commit is contained in:
committed by
Tolga Ceylan
parent
c36c627b93
commit
27179ddf54
@@ -248,7 +248,7 @@ func (a *agent) submit(ctx context.Context, call *call) error {
|
||||
return transformTimeout(err, true)
|
||||
}
|
||||
|
||||
defer slot.Close() // notify our slot is free once we're done
|
||||
defer slot.Close(ctx) // notify our slot is free once we're done
|
||||
|
||||
err = call.Start(ctx)
|
||||
if err != nil {
|
||||
@@ -447,7 +447,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
|
||||
case s := <-ch:
|
||||
if s.acquireSlot() {
|
||||
if s.slot.Error() != nil {
|
||||
s.slot.Close()
|
||||
s.slot.Close(ctx)
|
||||
return nil, s.slot.Error()
|
||||
}
|
||||
return s.slot, nil
|
||||
@@ -493,7 +493,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
|
||||
select {
|
||||
case s := <-ch:
|
||||
if s.Error() != nil {
|
||||
s.Close()
|
||||
s.Close(ctx)
|
||||
return nil, s.Error()
|
||||
}
|
||||
return s, nil
|
||||
@@ -537,11 +537,13 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (s *coldSlot) Close() error {
|
||||
func (s *coldSlot) Close(ctx context.Context) error {
|
||||
if s.cookie != nil {
|
||||
// call this from here so that in exec we don't have to eat container
|
||||
// removal latency
|
||||
s.cookie.Close(context.Background()) // ensure container removal, separate ctx
|
||||
// NOTE ensure container removal, no ctx timeout
|
||||
ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx))
|
||||
s.cookie.Close(ctx)
|
||||
}
|
||||
if s.tok != nil {
|
||||
s.tok.Close()
|
||||
@@ -557,7 +559,7 @@ type hotSlot struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *hotSlot) Close() error {
|
||||
func (s *hotSlot) Close(ctx context.Context) error {
|
||||
close(s.done)
|
||||
return nil
|
||||
}
|
||||
@@ -643,7 +645,7 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
|
||||
select {
|
||||
case ch <- slot:
|
||||
case <-ctx.Done():
|
||||
slot.Close()
|
||||
slot.Close(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -689,7 +691,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err})
|
||||
return
|
||||
}
|
||||
defer cookie.Close(context.Background()) // ensure container removal, separate ctx
|
||||
defer cookie.Close(ctx) // NOTE ensure this ctx doesn't time out
|
||||
|
||||
waiter, err := cookie.Run(ctx)
|
||||
if err != nil {
|
||||
@@ -768,7 +770,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state
|
||||
// if we can eject token, that means we are here due to
|
||||
// abort/shutdown/timeout, attempt to eject and terminate,
|
||||
// otherwise continue processing the request
|
||||
if call.slots.ejectSlot(s) {
|
||||
if call.slots.ejectSlot(ctx, s) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
|
||||
type Slot interface {
|
||||
exec(ctx context.Context, call *call) error
|
||||
Close() error
|
||||
Close(ctx context.Context) error
|
||||
Error() error
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ func (a *slotToken) acquireSlot() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *slotQueue) ejectSlot(s *slotToken) bool {
|
||||
func (a *slotQueue) ejectSlot(ctx context.Context, s *slotToken) bool {
|
||||
// let's get the lock
|
||||
if !atomic.CompareAndSwapUint32(&s.isBusy, 0, 1) {
|
||||
return false
|
||||
@@ -96,7 +96,7 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool {
|
||||
}
|
||||
a.cond.L.Unlock()
|
||||
|
||||
s.slot.Close()
|
||||
s.slot.Close(ctx)
|
||||
// now we have the lock, push the trigger
|
||||
close(s.trigger)
|
||||
return true
|
||||
@@ -142,7 +142,7 @@ func (a *slotQueue) startDequeuer(ctx context.Context) chan *slotToken {
|
||||
case <-ctx.Done(): // time out or cancel from caller
|
||||
// consume slot, we let the hot container queue the slot again
|
||||
if item.acquireSlot() {
|
||||
item.slot.Close()
|
||||
item.slot.Close(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ func (a *testSlot) exec(ctx context.Context, call *call) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *testSlot) Close() error {
|
||||
func (a *testSlot) Close(ctx context.Context) error {
|
||||
if a.isClosed {
|
||||
panic(fmt.Errorf("id=%d already closed %v", a.id, a))
|
||||
}
|
||||
@@ -77,19 +77,19 @@ func TestSlotQueueBasic1(t *testing.T) {
|
||||
|
||||
// Now according to LIFO semantics, we should get 9,8,7,6,5,4,3,2,1,0 if we dequeued right now.
|
||||
// but let's eject 9
|
||||
if !obj.ejectSlot(tokens[9]) {
|
||||
if !obj.ejectSlot(ctx, tokens[9]) {
|
||||
t.Fatalf("Cannot eject slotToken: %#v", tokens[9])
|
||||
}
|
||||
// let eject 0
|
||||
if !obj.ejectSlot(tokens[0]) {
|
||||
if !obj.ejectSlot(ctx, tokens[0]) {
|
||||
t.Fatalf("Cannot eject slotToken: %#v", tokens[0])
|
||||
}
|
||||
// let eject 5
|
||||
if !obj.ejectSlot(tokens[5]) {
|
||||
if !obj.ejectSlot(ctx, tokens[5]) {
|
||||
t.Fatalf("Cannot eject slotToken: %#v", tokens[5])
|
||||
}
|
||||
// try ejecting 5 again, it should fail
|
||||
if obj.ejectSlot(tokens[5]) {
|
||||
if obj.ejectSlot(ctx, tokens[5]) {
|
||||
t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5])
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ func TestSlotQueueBasic1(t *testing.T) {
|
||||
t.Fatalf("Should not be able to acquire twice slotToken: %#v", z)
|
||||
}
|
||||
|
||||
z.slot.Close()
|
||||
z.slot.Close(ctx)
|
||||
|
||||
case <-time.After(time.Duration(1) * time.Second):
|
||||
t.Fatal("timeout in waiting slotToken")
|
||||
@@ -126,7 +126,7 @@ func TestSlotQueueBasic1(t *testing.T) {
|
||||
}
|
||||
|
||||
// eject it before we can consume
|
||||
if !obj.ejectSlot(tokens[7]) {
|
||||
if !obj.ejectSlot(ctx, tokens[7]) {
|
||||
t.Fatalf("Cannot eject slotToken: %#v", tokens[2])
|
||||
}
|
||||
|
||||
@@ -263,14 +263,14 @@ func TestSlotQueueBasic3(t *testing.T) {
|
||||
t.Fatalf("2 acquire should not fail")
|
||||
}
|
||||
|
||||
item.slot.Close()
|
||||
item.slot.Close(ctx)
|
||||
|
||||
case <-time.After(time.Duration(1) * time.Second):
|
||||
t.Fatal("timeout in waiting slotToken")
|
||||
}
|
||||
|
||||
// let's eject 1
|
||||
if !obj.ejectSlot(token1) {
|
||||
if !obj.ejectSlot(ctx, token1) {
|
||||
t.Fatalf("failed to eject 1")
|
||||
}
|
||||
if !slot1.(*testSlot).isClosed {
|
||||
|
||||
Reference in New Issue
Block a user