Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions group_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package syncs
import "context"

type options struct {
ctx context.Context
cancel context.CancelFunc
preLock bool
termOnError bool
discardIfFull bool
ctx context.Context
cancel context.CancelFunc
preLock bool
termOnError bool
discardIfFull bool
waitQueue int
notBlockCaller bool
}

// GroupOption functional option type
Expand Down Expand Up @@ -35,3 +37,15 @@ func Discard(o *options) {
o.discardIfFull = true
o.preLock = true // discard implies preemptive
}

// NonBlocking wait in caller or in goroutine
func NonBlocking(o *options) {
o.notBlockCaller = true
}

// WaitQueue size sets maximum amount of waiters that will block
func WaitQueue(size int) GroupOption {
return func(o *options) {
o.waitQueue = size
}
}
43 changes: 35 additions & 8 deletions sizedgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
// SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup
type SizedGroup struct {
options
wg sync.WaitGroup
sema Locker
wg sync.WaitGroup
sema Locker
waiters *Locker
}

// NewSizedGroup makes wait group with limited size alive goroutines
Expand All @@ -21,6 +22,10 @@ func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup {
for _, opt := range opts {
opt(&res.options)
}
if res.options.waitQueue != 0 {
var ptr = NewSemaphore(res.options.waitQueue)
res.waiters = &ptr
}
return &res
}

Expand All @@ -40,21 +45,43 @@ func (g *SizedGroup) Go(fn func(ctx context.Context)) {
return
}

waitForWaiters := false
if g.preLock {
lockOk := g.sema.TryLock()
if !lockOk && g.discardIfFull {
// lock failed and discardIfFull is set, discard this goroutine
return
}
if !lockOk && !g.discardIfFull {
g.sema.Lock() // make sure we have block until lock is acquired
if !lockOk {
waiterOk := false
if g.waiters != nil {
waiterOk = (*g.waiters).TryLock()
}
if !waiterOk && g.discardIfFull {
// lock failed and discardIfFull is set, discard this goroutine
return
}
if !waiterOk && g.waiters != nil {
// wait for waiters and then for sema
(*g.waiters).Lock()
}
waitForWaiters = true
if !g.options.notBlockCaller {
g.sema.Lock() // make sure we have block until lock is acquired
if g.waiters != nil {
(*g.waiters).Unlock()
}
}
}
}

g.wg.Add(1)
go func() {
defer g.wg.Done()

if waitForWaiters && g.preLock && g.options.notBlockCaller {
g.sema.Lock() // make sure we have block until lock is acquired
if g.waiters != nil {
(*g.waiters).Unlock()
}
}

if canceled() {
return
}
Expand Down
75 changes: 75 additions & 0 deletions sizedgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,78 @@ func ExampleSizedGroup_go() {

grp.Wait() // wait for completion
}

func TestSizedGroupWaiters(t *testing.T) {
swg := NewSizedGroup(10, Preemptive, WaitQueue(10))
var c uint32

timeNow := time.Now()

for i := 0; i < 300; i++ {
swg.Go(func(ctx context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
swg.Wait()
timeDiff := time.Since(timeNow)
assert.GreaterOrEqual(t, timeDiff, time.Millisecond*5*(300/10))
assert.Equal(t, uint32(300), c, fmt.Sprintf("%d, not all routines have been executed", c))
}

func TestSizedGroupWaiters_NonBlocking(t *testing.T) {
swg := NewSizedGroup(10, NonBlocking, Preemptive, WaitQueue(10))
var c uint32

timeNow := time.Now()

for i := 0; i < 300; i++ {
swg.Go(func(ctx context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
swg.Wait()
timeDiff := time.Since(timeNow)
assert.GreaterOrEqual(t, timeDiff, time.Millisecond*5*(300/10))
assert.Equal(t, uint32(300), c, fmt.Sprintf("%d, not all routines have been executed", c))
}

func TestSizedGroupWaiters_Discard(t *testing.T) {
swg := NewSizedGroup(10, Preemptive, Discard, WaitQueue(5), NonBlocking)
var c uint32

timeNow := time.Now()
for i := 0; i < 10; i++ {
newI := i
swg.Go(func(ctx context.Context) {
if newI < 5 {
time.Sleep(3 * time.Second)
} else {
time.Sleep(1 * time.Second)
}
atomic.AddUint32(&c, 1)
})
}
{
timeNow2 := time.Now()
for i := 0; i < 5; i++ {
swg.Go(func(ctx context.Context) {
time.Sleep(1 * time.Second)
atomic.AddUint32(&c, 1)
})
}
timeDiff := time.Since(timeNow2)
assert.True(t, timeDiff < time.Second+time.Millisecond*100)
}
for i := 0; i < 10; i++ {
swg.Go(func(ctx context.Context) {
panic("wrong logic")
})
}

swg.Wait()
timeDiff := time.Since(timeNow)
assert.True(t, (timeDiff > time.Second*3) && (timeDiff < time.Second*3+time.Millisecond*100))
assert.Equal(t, uint32(15), c, fmt.Sprintf("%d, not all routines have been executed", c))
}