From 623aeb35b2914975ce1e9359c328998bda40a5fe Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Thu, 12 Apr 2018 16:21:13 -0700 Subject: [PATCH] fn: common.WaitGroup improvements (#940) * fn: common.WaitGroup improvements *) Split the API into AddSession/DoneSession *) Only wake up listeners when session count reaches zero. * fn: WaitGroup go-routine blast test * fn: test fix and rebase fixup --- api/agent/agent.go | 6 +-- api/agent/async.go | 6 +-- api/agent/lb_agent.go | 4 +- api/agent/runner_client.go | 2 +- api/common/wait_utils.go | 70 +++++++++++++++++------------------ api/common/wait_utils_test.go | 62 +++++++++++++++++++++++++------ 6 files changed, 94 insertions(+), 56 deletions(-) diff --git a/api/agent/agent.go b/api/agent/agent.go index d2a0463ce..4f936a95a 100644 --- a/api/agent/agent.go +++ b/api/agent/agent.go @@ -258,7 +258,7 @@ func (a *agent) scheduleCallEnd(fn func()) { go func() { fn() atomic.AddInt64(&a.callEndCount, -1) - a.shutWg.AddSession(-1) + a.shutWg.DoneSession() }() } @@ -266,7 +266,7 @@ func (a *agent) finalizeCallEnd(ctx context.Context, err error, isRetriable, isS // if scheduled in background, let scheduleCallEnd() handle // the shutWg group, otherwise decrement here. if !isScheduled { - a.shutWg.AddSession(-1) + a.shutWg.DoneSession() } handleStatsEnd(ctx, err) return transformTimeout(err, isRetriable) @@ -442,7 +442,7 @@ func (a *agent) checkLaunch(ctx context.Context, call *call) { go func() { // NOTE: runHot will not inherit the timeout from ctx (ignore timings) a.runHot(ctx, call, tok, state) - a.shutWg.AddSession(-1) + a.shutWg.DoneSession() }() return } diff --git a/api/agent/async.go b/api/agent/async.go index a26bbb6f4..8b7f08b0f 100644 --- a/api/agent/async.go +++ b/api/agent/async.go @@ -23,7 +23,7 @@ func (a *agent) asyncDequeue() { for { select { case <-a.shutWg.Closer(): - a.shutWg.AddSession(-1) + a.shutWg.DoneSession() return case <-a.resources.WaitAsyncResource(ctx): // TODO we _could_ return a token here to reserve the ram so that there's @@ -35,13 +35,13 @@ func (a *agent) asyncDequeue() { // we think we can get a cookie now, so go get a cookie select { case <-a.shutWg.Closer(): - a.shutWg.AddSession(-1) + a.shutWg.DoneSession() return case model, ok := <-a.asyncChew(ctx): if ok { go func(model *models.Call) { a.asyncRun(ctx, model) - a.shutWg.AddSession(-1) + a.shutWg.DoneSession() }(model) // WARNING: tricky. We reserve another session for next iteration of the loop diff --git a/api/agent/lb_agent.go b/api/agent/lb_agent.go index babdcbd37..c66754355 100644 --- a/api/agent/lb_agent.go +++ b/api/agent/lb_agent.go @@ -192,7 +192,7 @@ func (a *lbAgent) scheduleCallEnd(fn func()) { go func() { fn() atomic.AddInt64(&a.callEndCount, -1) - a.shutWg.AddSession(-1) + a.shutWg.DoneSession() }() } @@ -209,7 +209,7 @@ func (a *lbAgent) handleCallEnd(ctx context.Context, call *call, err error, isSt return transformTimeout(err, false) } - a.shutWg.AddSession(-1) + a.shutWg.DoneSession() handleStatsDequeue(ctx, err) return transformTimeout(err, true) } diff --git a/api/agent/runner_client.go b/api/agent/runner_client.go index 7a6819426..c05430dae 100644 --- a/api/agent/runner_client.go +++ b/api/agent/runner_client.go @@ -94,7 +94,7 @@ func (r *gRPCRunner) TryExec(ctx context.Context, call pool.RunnerCall) (bool, e if !r.shutWg.AddSession(1) { return true, ErrorRunnerClosed } - defer r.shutWg.AddSession(-1) + defer r.shutWg.DoneSession() // extract the call's model data to pass on to the pure runner modelJSON, err := json.Marshal(call.Model()) diff --git a/api/common/wait_utils.go b/api/common/wait_utils.go index 0fa544f5c..7eff40d9c 100644 --- a/api/common/wait_utils.go +++ b/api/common/wait_utils.go @@ -9,20 +9,19 @@ import ( /* WaitGroup is used to manage and wait for a collection of sessions. It is similar to sync.WaitGroup, but - AddSession/CloseGroup session is not only thread + AddSession/DoneSession/CloseGroup session is not only thread safe but can be executed in any order unlike sync.WaitGroup. - Once a shutdown is initiated via CloseGroup(), add/rm + Once a shutdown is initiated via CloseGroup(), add/done operations will still function correctly, where AddSession would return false. In this state, CloseGroup() blocks until sessions get drained - via remove operations. + via DoneSession() operations. - It is an error to call AddSession() with invalid values. - For example, if current session count is 1, AddSession - can only add more or subtract 1 from this. Caller needs - to make sure addition/subtraction math is correct when - using WaitGroup. + It is callers responsibility to make sure AddSessions + and DoneSessions math adds up to >= 0. In other words, + calling more DoneSessions() than sum of AddSessions() + will cause panic. Example usage: @@ -34,7 +33,7 @@ import ( // group may be closing or full return } - defer group.AddSession(-1) + defer group.DoneSession() // do stuff }(item) @@ -65,45 +64,44 @@ func (r *WaitGroup) Closer() chan struct{} { } // AddSession manipulates the session counter by -// adding or subtracting the delta value. Incrementing +// adding the delta value. Incrementing // the session counter is not possible and will set // return value to false if a close was initiated. -// It's callers responsibility to make sure addition and -// subtraction math is correct. -func (r *WaitGroup) AddSession(delta int64) bool { +func (r *WaitGroup) AddSession(delta uint64) bool { r.cond.L.Lock() defer r.cond.L.Unlock() - if delta >= 0 { - // we cannot add if we are being shutdown - if r.isClosed { - return false - } + // we cannot add if we are being shutdown + if r.isClosed { + return false + } - incr := uint64(delta) + // we have maxed out + if r.sessions == math.MaxUint64-delta { + return false + } - // we have maxed out - if r.sessions == math.MaxUint64-incr { - return false - } + r.sessions += delta + return true +} - r.sessions += incr - } else { - decr := uint64(-delta) +// DoneSession decrements 1 from accumulated +// sessions and wakes up listeners when this reaches +// zero. +func (r *WaitGroup) DoneSession() { + r.cond.L.Lock() + defer r.cond.L.Unlock() - // illegal operation, it's callers responsibility - // to make sure subtraction and addition math is correct. - if r.sessions < decr { - panic(fmt.Sprintf("common.WaitGroup misuse sum=%d decr=%d isClosed=%v", - r.sessions, decr, r.isClosed)) - } + // illegal operation, it's callers responsibility + // to make sure subtraction and addition math is correct. + if r.sessions == 0 { + panic(fmt.Sprintf("common.WaitGroup misuse isClosed=%v", r.isClosed)) + } - r.sessions -= decr - - // subtractions need to notify CloseGroup + r.sessions -= 1 + if r.sessions == 0 { r.cond.Broadcast() } - return true } // CloseGroup initiates a close and blocks until diff --git a/api/common/wait_utils_test.go b/api/common/wait_utils_test.go index 76661b498..8cc799236 100644 --- a/api/common/wait_utils_test.go +++ b/api/common/wait_utils_test.go @@ -2,6 +2,7 @@ package common import ( "testing" + "time" ) func isClosed(ch chan struct{}) bool { @@ -44,6 +45,52 @@ func TestWaitGroupEmpty(t *testing.T) { } } +func TestWaitGroupBlast(t *testing.T) { + wg := NewWaitGroup() + wg.AddSession(1) + + blastRadius := 500 + for i := 0; i < blastRadius; i++ { + go func(i int) { + if !wg.AddSession(1) { + // this is OK, we are creating a race + // and some cannot make it after main + // go-routine issues a CloseGroupNB() + return + } + + if isClosed(wg.Closer()) { + t.Fatalf("Should not be closing state") + } + + wg.DoneSession() + }(i) + } + + if isClosed(wg.Closer()) { + t.Fatalf("Should not be closing state") + } + + done := wg.CloseGroupNB() + + if !isClosed(wg.Closer()) { + t.Fatalf("Should be closing state") + } + + select { + case <-done: + t.Fatalf("NB Chan should not be closed yet, since sum is 1") + case <-time.After(time.Duration(1) * time.Second): + wg.DoneSession() + } + + select { + case <-done: + case <-time.After(time.Duration(1) * time.Second): + t.Fatalf("NB Chan should be closed by now, since sum is 0") + } +} + func TestWaitGroupSingle(t *testing.T) { wg := NewWaitGroup() @@ -60,10 +107,7 @@ func TestWaitGroupSingle(t *testing.T) { t.Fatalf("Should not be closing state yet") } - if !wg.AddSession(-1) { - t.Fatalf("Add -1 should not fail") - } - + wg.DoneSession() // sum should be zero now. if !wg.AddSession(2) { @@ -78,9 +122,8 @@ func TestWaitGroupSingle(t *testing.T) { t.Fatalf("NB Chan should not be closed yet, since sum is 2") } - if !wg.AddSession(-1) { - t.Fatalf("Add -1 should not fail") - } + wg.DoneSession() + if wg.AddSession(1) { t.Fatalf("Add 1 should fail (we are shutting down)") } @@ -106,9 +149,7 @@ func TestWaitGroupSingle(t *testing.T) { t.Fatalf("Should be closing state") } - if !wg.AddSession(-1) { - t.Fatalf("Add -1 should not fail") - } + wg.DoneSession() // sum is 0 now <-done @@ -120,5 +161,4 @@ func TestWaitGroupSingle(t *testing.T) { if !isClosed(wg.Closer()) { t.Fatalf("Should be closing state") } - }