From feeeca332135c1ab51775f8acdd43c0988316f41 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Fri, 22 Dec 2017 12:52:31 -0800 Subject: [PATCH] fn: agent shutdown improvements (#622) --- api/agent/agent.go | 13 +++++++++---- api/agent/async.go | 1 - api/server/runner_test.go | 1 + 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index d699ce163..13e70827e 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -118,6 +118,7 @@ type agent struct { // used to track running calls / safe shutdown wg sync.WaitGroup // TODO rename + shutonce sync.Once shutdown chan struct{} stats // TODO kill me @@ -140,6 +141,7 @@ func New(da DataAccess) Agent { } // TODO assert that agent doesn't get started for API nodes up above ? + a.wg.Add(1) go a.asyncDequeue() // safe shutdown can nanny this fine return a @@ -151,11 +153,11 @@ func (a *agent) Enqueue(ctx context.Context, call *models.Call) error { } func (a *agent) Close() error { - select { - case <-a.shutdown: - default: + + a.shutonce.Do(func() { close(a.shutdown) - } + }) + a.wg.Wait() return nil } @@ -300,6 +302,8 @@ launchLoop: if !isOpen { return nil, models.ErrCallTimeoutServerBusy } + + a.wg.Add(1) go a.runHot(ctx, call, tok) case s, ok := <-call.slots.getDequeueChan(): tokenCancel() @@ -502,6 +506,7 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch func (a *agent) runHot(ctxArg context.Context, call *call, tok ResourceToken) { // We must be careful to only use ctxArg for logs/spans + defer a.wg.Done() // create a span from ctxArg but ignore the new Context // instead we will create a new Context below and explicitly set its span diff --git a/api/agent/async.go b/api/agent/async.go index f160db83b..9db9277d3 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -9,7 +9,6 @@ import ( ) func (a *agent) asyncDequeue() { - a.wg.Add(1) defer a.wg.Done() // we can treat this thread like one big task and get safe shutdown fo free // this is just so we can hang up the dequeue request if we get shut down diff --git a/api/server/runner_test.go b/api/server/runner_test.go index 4fa9c0e74..9e66e81e6 100644 --- a/api/server/runner_test.go +++ b/api/server/runner_test.go @@ -255,6 +255,7 @@ func TestRouteRunnerTimeout(t *testing.T) { fnl := logs.NewMock() srv := testServer(ds, &mqs.Mock{}, fnl, rnr, ServerTypeFull) + defer srv.agent.Close() for i, test := range []struct { path string