diff --git a/api/agent/agent.go b/api/agent/agent.go index 342ef26ef..060bf7589 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -248,7 +248,7 @@ func (a *agent) submit(ctx context.Context, call *call) error { return transformTimeout(err, true) } - defer slot.Close() // notify our slot is free once we're done + defer slot.Close(ctx) // notify our slot is free once we're done err = call.Start(ctx) if err != nil { @@ -447,7 +447,7 @@ func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) { case s := <-ch: if s.acquireSlot() { if s.slot.Error() != nil { - s.slot.Close() + s.slot.Close(ctx) return nil, s.slot.Error() } return s.slot, nil @@ -493,7 +493,7 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) { select { case s := <-ch: if s.Error() != nil { - s.Close() + s.Close(ctx) return nil, s.Error() } return s, nil @@ -537,11 +537,13 @@ func (s *coldSlot) exec(ctx context.Context, call *call) error { return ctx.Err() } -func (s *coldSlot) Close() error { +func (s *coldSlot) Close(ctx context.Context) error { if s.cookie != nil { // call this from here so that in exec we don't have to eat container // removal latency - s.cookie.Close(context.Background()) // ensure container removal, separate ctx + // NOTE ensure container removal, no ctx timeout + ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx)) + s.cookie.Close(ctx) } if s.tok != nil { s.tok.Close() @@ -557,7 +559,7 @@ type hotSlot struct { err error } -func (s *hotSlot) Close() error { +func (s *hotSlot) Close(ctx context.Context) error { close(s.done) return nil } @@ -643,7 +645,7 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch select { case ch <- slot: case <-ctx.Done(): - slot.Close() + slot.Close(ctx) } } @@ -689,7 +691,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state call.slots.queueSlot(&hotSlot{done: make(chan struct{}), err: err}) return } - defer cookie.Close(context.Background()) // ensure container removal, separate ctx + defer cookie.Close(ctx) // NOTE ensure this ctx doesn't time out waiter, err := cookie.Run(ctx) if err != nil { @@ -768,7 +770,7 @@ func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state // if we can eject token, that means we are here due to // abort/shutdown/timeout, attempt to eject and terminate, // otherwise continue processing the request - if call.slots.ejectSlot(s) { + if call.slots.ejectSlot(ctx, s) { return } diff --git a/api/agent/slots.go b/api/agent/slots.go index a71f0e820..6a361f875 100644 --- a/api/agent/slots.go +++ b/api/agent/slots.go @@ -17,7 +17,7 @@ import ( type Slot interface { exec(ctx context.Context, call *call) error - Close() error + Close(ctx context.Context) error Error() error } @@ -81,7 +81,7 @@ func (a *slotToken) acquireSlot() bool { return true } -func (a *slotQueue) ejectSlot(s *slotToken) bool { +func (a *slotQueue) ejectSlot(ctx context.Context, s *slotToken) bool { // let's get the lock if !atomic.CompareAndSwapUint32(&s.isBusy, 0, 1) { return false @@ -96,7 +96,7 @@ func (a *slotQueue) ejectSlot(s *slotToken) bool { } a.cond.L.Unlock() - s.slot.Close() + s.slot.Close(ctx) // now we have the lock, push the trigger close(s.trigger) return true @@ -142,7 +142,7 @@ func (a *slotQueue) startDequeuer(ctx context.Context) chan *slotToken { case <-ctx.Done(): // time out or cancel from caller // consume slot, we let the hot container queue the slot again if item.acquireSlot() { - item.slot.Close() + item.slot.Close(ctx) } } } diff --git a/api/agent/slots_test.go b/api/agent/slots_test.go index 694ae4304..14ad3e119 100644 --- a/api/agent/slots_test.go +++ b/api/agent/slots_test.go @@ -18,7 +18,7 @@ func (a *testSlot) exec(ctx context.Context, call *call) error { return nil } -func (a *testSlot) Close() error { +func (a *testSlot) Close(ctx context.Context) error { if a.isClosed { panic(fmt.Errorf("id=%d already closed %v", a.id, a)) } @@ -77,19 +77,19 @@ func TestSlotQueueBasic1(t *testing.T) { // Now according to LIFO semantics, we should get 9,8,7,6,5,4,3,2,1,0 if we dequeued right now. // but let's eject 9 - if !obj.ejectSlot(tokens[9]) { + if !obj.ejectSlot(ctx, tokens[9]) { t.Fatalf("Cannot eject slotToken: %#v", tokens[9]) } // let eject 0 - if !obj.ejectSlot(tokens[0]) { + if !obj.ejectSlot(ctx, tokens[0]) { t.Fatalf("Cannot eject slotToken: %#v", tokens[0]) } // let eject 5 - if !obj.ejectSlot(tokens[5]) { + if !obj.ejectSlot(ctx, tokens[5]) { t.Fatalf("Cannot eject slotToken: %#v", tokens[5]) } // try ejecting 5 again, it should fail - if obj.ejectSlot(tokens[5]) { + if obj.ejectSlot(ctx, tokens[5]) { t.Fatalf("Shouldn't be able to eject slotToken: %#v", tokens[5]) } @@ -112,7 +112,7 @@ func TestSlotQueueBasic1(t *testing.T) { t.Fatalf("Should not be able to acquire twice slotToken: %#v", z) } - z.slot.Close() + z.slot.Close(ctx) case <-time.After(time.Duration(1) * time.Second): t.Fatal("timeout in waiting slotToken") @@ -126,7 +126,7 @@ func TestSlotQueueBasic1(t *testing.T) { } // eject it before we can consume - if !obj.ejectSlot(tokens[7]) { + if !obj.ejectSlot(ctx, tokens[7]) { t.Fatalf("Cannot eject slotToken: %#v", tokens[2]) } @@ -263,14 +263,14 @@ func TestSlotQueueBasic3(t *testing.T) { t.Fatalf("2 acquire should not fail") } - item.slot.Close() + item.slot.Close(ctx) case <-time.After(time.Duration(1) * time.Second): t.Fatal("timeout in waiting slotToken") } // let's eject 1 - if !obj.ejectSlot(token1) { + if !obj.ejectSlot(ctx, token1) { t.Fatalf("failed to eject 1") } if !slot1.(*testSlot).isClosed {