From fc0d3d49d238ffccbaf18bc189910d57a2a36269 Mon Sep 17 00:00:00 2001 From: Tolga Ceylan Date: Wed, 11 Apr 2018 16:42:59 -0700 Subject: [PATCH] fn: introducing WaitGroup suitable for shutdown/session mgmt (#936) * fn: introducing WaitGroup suitable for shutdown/session mgmt --- api/common/wait_utils.go | 104 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 api/common/wait_utils.go diff --git a/api/common/wait_utils.go b/api/common/wait_utils.go new file mode 100644 index 000000000..cc1f5c35f --- /dev/null +++ b/api/common/wait_utils.go @@ -0,0 +1,104 @@ +package common + +import ( + "math" + "sync" +) + +/* + WaitGroup is used to manage and wait for a collection of + sessions. It is similar to sync.WaitGroup, but + AddSession/RmSession/WaitClose session is not only thread + safe but can be executed in any order unlike sync.WaitGroup. + + Once a shutdown is initiated via CloseGroup(), add/rm + operations will still function correctly, where + AddSession would return false error. + In this state, CloseGroup() blocks until sessions get drained + via RmSession() calls. + + It is an error to call RmSession without a corresponding + successful AddSession. + + Example usage: + + group := NewWaitGroup() + + for item := range(items) { + go func(item string) { + if !group.AddSession() { + // group may be closing or full + return + } + defer group.RmSession() + + // do stuff + }(item) + } + + // close the group and wait for active item. + group.CloseGroup() +*/ + +type WaitGroup struct { + cond *sync.Cond + isClosed bool + sessions uint64 +} + +func NewWaitGroup() *WaitGroup { + return &WaitGroup{ + cond: sync.NewCond(new(sync.Mutex)), + } +} + +func (r *WaitGroup) AddSession() bool { + r.cond.L.Lock() + defer r.cond.L.Unlock() + + if r.isClosed { + return false + } + if r.sessions == math.MaxUint64 { + return false + } + + r.sessions++ + return true +} + +func (r *WaitGroup) RmSession() { + r.cond.L.Lock() + + if r.sessions == 0 { + panic("WaitGroup misuse: no sessions to remove") + } + + r.sessions-- + r.cond.Broadcast() + + r.cond.L.Unlock() +} + +func (r *WaitGroup) CloseGroup() { + r.cond.L.Lock() + + r.isClosed = true + for r.sessions > 0 { + r.cond.Wait() + } + + r.cond.L.Unlock() +} + +func (r *WaitGroup) CloseGroupNB() chan struct{} { + + closer := make(chan struct{}) + + go func() { + defer close(closer) + r.CloseGroup() + }() + + return closer +}