mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* fn: common.WaitGroup improvements *) Split the API into AddSession/DoneSession *) Only wake up listeners when session count reaches zero. * fn: WaitGroup go-routine blast test * fn: test fix and rebase fixup
145 lines
3.0 KiB
Go
145 lines
3.0 KiB
Go
package common
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"sync"
|
|
)
|
|
|
|
/*
|
|
WaitGroup is used to manage and wait for a collection of
|
|
sessions. It is similar to sync.WaitGroup, but
|
|
AddSession/DoneSession/CloseGroup session is not only thread
|
|
safe but can be executed in any order unlike sync.WaitGroup.
|
|
|
|
Once a shutdown is initiated via CloseGroup(), add/done
|
|
operations will still function correctly, where
|
|
AddSession would return false. In this state,
|
|
CloseGroup() blocks until sessions get drained
|
|
via DoneSession() operations.
|
|
|
|
It is callers responsibility to make sure AddSessions
|
|
and DoneSessions math adds up to >= 0. In other words,
|
|
calling more DoneSessions() than sum of AddSessions()
|
|
will cause panic.
|
|
|
|
Example usage:
|
|
|
|
group := NewWaitGroup()
|
|
|
|
for item := range(items) {
|
|
go func(item string) {
|
|
if !group.AddSession(1) {
|
|
// group may be closing or full
|
|
return
|
|
}
|
|
defer group.DoneSession()
|
|
|
|
// do stuff
|
|
}(item)
|
|
}
|
|
|
|
// close the group and wait for active item.
|
|
group.CloseGroup()
|
|
*/
|
|
|
|
type WaitGroup struct {
|
|
cond *sync.Cond
|
|
closer chan struct{}
|
|
isClosed bool
|
|
sessions uint64
|
|
}
|
|
|
|
func NewWaitGroup() *WaitGroup {
|
|
return &WaitGroup{
|
|
cond: sync.NewCond(new(sync.Mutex)),
|
|
closer: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Closer returns a channel that is closed if
|
|
// WaitGroup is in closing state
|
|
func (r *WaitGroup) Closer() chan struct{} {
|
|
return r.closer
|
|
}
|
|
|
|
// AddSession manipulates the session counter by
|
|
// adding the delta value. Incrementing
|
|
// the session counter is not possible and will set
|
|
// return value to false if a close was initiated.
|
|
func (r *WaitGroup) AddSession(delta uint64) bool {
|
|
r.cond.L.Lock()
|
|
defer r.cond.L.Unlock()
|
|
|
|
// we cannot add if we are being shutdown
|
|
if r.isClosed {
|
|
return false
|
|
}
|
|
|
|
// we have maxed out
|
|
if r.sessions == math.MaxUint64-delta {
|
|
return false
|
|
}
|
|
|
|
r.sessions += delta
|
|
return true
|
|
}
|
|
|
|
// DoneSession decrements 1 from accumulated
|
|
// sessions and wakes up listeners when this reaches
|
|
// zero.
|
|
func (r *WaitGroup) DoneSession() {
|
|
r.cond.L.Lock()
|
|
defer r.cond.L.Unlock()
|
|
|
|
// illegal operation, it's callers responsibility
|
|
// to make sure subtraction and addition math is correct.
|
|
if r.sessions == 0 {
|
|
panic(fmt.Sprintf("common.WaitGroup misuse isClosed=%v", r.isClosed))
|
|
}
|
|
|
|
r.sessions -= 1
|
|
if r.sessions == 0 {
|
|
r.cond.Broadcast()
|
|
}
|
|
}
|
|
|
|
// CloseGroup initiates a close and blocks until
|
|
// session counter becomes zero.
|
|
func (r *WaitGroup) CloseGroup() {
|
|
r.cond.L.Lock()
|
|
|
|
if !r.isClosed {
|
|
r.isClosed = true
|
|
close(r.closer)
|
|
}
|
|
|
|
for r.sessions != 0 {
|
|
r.cond.Wait()
|
|
}
|
|
|
|
r.cond.L.Unlock()
|
|
}
|
|
|
|
// CloseGroupNB is non-blocking version of CloseGroup
|
|
// which returns a channel that can be waited on.
|
|
func (r *WaitGroup) CloseGroupNB() chan struct{} {
|
|
|
|
// set to closing state immediately
|
|
r.cond.L.Lock()
|
|
if !r.isClosed {
|
|
r.isClosed = true
|
|
close(r.closer)
|
|
}
|
|
r.cond.L.Unlock()
|
|
|
|
closer := make(chan struct{})
|
|
|
|
go func() {
|
|
defer close(closer)
|
|
r.CloseGroup()
|
|
}()
|
|
|
|
return closer
|
|
}
|