Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ func (res *Resource[T]) IdleDuration() time.Duration {
return time.Duration(nanotime() - res.lastUsedNano)
}

type ResourceTraceStats struct {
CreationTime time.Time
IdleDuration time.Duration
}

func (res *Resource[T]) traceStats() ResourceTraceStats {
return ResourceTraceStats{
CreationTime: res.CreationTime(),
IdleDuration: res.IdleDuration(),
}
}

// Pool is a concurrency-safe resource pool.
type Pool[T any] struct {
// mux is the pool internal lock. Any modification of shared state of
Expand All @@ -135,6 +147,7 @@ type Pool[T any] struct {
constructor Constructor[T]
destructor Destructor[T]
maxSize int32
tracer Tracer

acquireCount int64
acquireDuration time.Duration
Expand All @@ -153,6 +166,7 @@ type Config[T any] struct {
Constructor Constructor[T]
Destructor Destructor[T]
MaxSize int32
Tracer Tracer
}

// NewPool creates a new pool. Returns an error iff MaxSize is less than 1.
Expand All @@ -167,6 +181,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {
acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
idleResources: genstack.NewGenStack[*Resource[T]](),
maxSize: config.MaxSize,
tracer: config.Tracer,
constructor: config.Constructor,
destructor: config.Destructor,
baseAcquireCtx: baseAcquireCtx,
Expand Down Expand Up @@ -352,32 +367,46 @@ func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
//
// This function exists solely only for benchmarking purposes.
func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
var trace tracerSpan
startNano := nanotime()
if p.tracer != nil {
ctx = p.tracer.AcquireStart(ctx, AcquireStartData{
StartNano: startNano,
})
trace = tracerSpan{
t: p.tracer,
start: startNano,
}
}

var waitedForLock bool
var waitTime time.Duration
if !p.acquireSem.TryAcquire(1) {
waitedForLock = true
err := p.acquireSem.Acquire(ctx, 1)
if err != nil {
defer trace.acquireEndErr(ctx, err)
p.canceledAcquireCount.Add(1)
return nil, err
}
}

p.mux.Lock()
if p.closed {
defer trace.acquireEndErr(ctx, ErrClosedPool)
p.acquireSem.Release(1)
p.mux.Unlock()
return nil, ErrClosedPool
}

// If a resource is available in the pool.
if res := p.tryAcquireIdleResource(); res != nil {
waitTime := time.Duration(nanotime() - startNano)
waitTime = time.Duration(nanotime() - startNano)
if waitedForLock {
p.emptyAcquireCount += 1
p.emptyAcquireWaitTime += waitTime
}
defer trace.acquireEnd(ctx, waitTime, res.traceStats, false)
p.acquireCount += 1
p.acquireDuration += waitTime
p.mux.Unlock()
Expand All @@ -395,6 +424,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {

res, err := p.initResourceValue(ctx, res)
if err != nil {
defer trace.acquireEndErr(ctx, err)
return nil, err
}

Expand All @@ -403,7 +433,8 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {

p.emptyAcquireCount += 1
p.acquireCount += 1
waitTime := time.Duration(nanotime() - startNano)
waitTime = time.Duration(nanotime() - startNano)
defer trace.acquireEnd(ctx, waitTime, res.traceStats, true)
p.acquireDuration += waitTime
p.emptyAcquireWaitTime += waitTime

Expand Down
44 changes: 41 additions & 3 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@ import (
"golang.org/x/sync/semaphore"
)

// TODO(draft) This is just to demo.
// Will do testable example for final PR.
type testLogTracer struct {
puddle.BaseTracer
t *testing.T
}

func (t *testLogTracer) Logf(format string, a ...any) {
t.t.Helper()
t.t.Logf(format, a...)
}

func (t *testLogTracer) AcquireEnd(ctx context.Context, data puddle.AcquireEndData) {
t.t.Helper()
if data.Err != nil {
t.Logf("Acquire error after %v: %v", data.AcquireDuration, data.Err)
return
}
if data.InitDuration > 0 {
t.Logf("got new resource after %v. blocked for %v, init took: %v", data.AcquireDuration, data.WaitDuration, data.InitDuration)
return
}
t.Logf("got pooled resource after %v. blocked for %v, resource is %v old and has been idle for %v", data.AcquireDuration, data.WaitDuration, time.Since(data.ResourceStats.CreationTime), data.ResourceStats.IdleDuration)
}

type Counter struct {
mutex sync.Mutex
n int
Expand Down Expand Up @@ -218,7 +243,14 @@ func TestPoolAcquireCreatesResourceRespectingContext(t *testing.T) {

func TestPoolAcquireReusesResources(t *testing.T) {
constructor, createCounter := createConstructor()
pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
pool, err := puddle.NewPool(&puddle.Config[int]{
Constructor: constructor,
Destructor: stubDestructor,
MaxSize: 10,
Tracer: &testLogTracer{
t: t,
},
})
require.NoError(t, err)

res, err := pool.Acquire(context.Background())
Expand Down Expand Up @@ -328,7 +360,14 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) {
}
return constructorCalls.Next(), nil
}
pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
pool, err := puddle.NewPool(&puddle.Config[int]{
Constructor: constructor,
Destructor: stubDestructor,
MaxSize: 10,
Tracer: &testLogTracer{
t: t,
},
})
require.NoError(t, err)

res, err := pool.Acquire(ctx)
Expand Down Expand Up @@ -1173,7 +1212,6 @@ func startAcceptOnceDummyServer(laddr string) {
}
}
}()

}

func ExamplePool() {
Expand Down
101 changes: 101 additions & 0 deletions tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package puddle

import (
"context"
"time"
)

// Tracer traces pool actions.
type Tracer interface {
// AcquireStart is called at the beginning of Acquire calls. The return
// context is used for the rest of the call and will be passed to AcquireEnd
AcquireStart(ctx context.Context, data AcquireStartData) context.Context
AcquireEnd(ctx context.Context, data AcquireEndData)
// ReleaseStart is called at the beginning of Release calls. The return
// context is used for the rest of the call and will be passed to ReleaseEnd
ReleaseStart(ctx context.Context, data ReleaseStartData) context.Context
ReleaseEnd(ctx context.Context, data ReleaseEndData)
}

type AcquireStartData struct {
StartNano int64
}

type AcquireEndData struct {
WaitDuration time.Duration
AcquireDuration time.Duration
InitDuration time.Duration
ResourceStats ResourceTraceStats
Err error
}

type ReleaseStartData struct {
HeldDuration time.Duration
}

type ReleaseEndData struct {
BlockDuration time.Duration
Err error
}

type tracerSpan struct {
t Tracer
start int64
}

func (t tracerSpan) acquireEndErr(ctx context.Context, err error) {
if t.t == nil {
return
}
t.t.AcquireEnd(ctx, AcquireEndData{
AcquireDuration: time.Duration(nanotime() - t.start),
Err: err,
})
}

func (t tracerSpan) acquireEnd(ctx context.Context, waitTime time.Duration, stats statFn, isNew bool) {
if t.t == nil {
return
}
var initDuration time.Duration
resourceStats := stats()
if isNew {
initDuration = time.Since(resourceStats.CreationTime)
}
t.t.AcquireEnd(ctx, AcquireEndData{
WaitDuration: waitTime,
AcquireDuration: time.Duration(nanotime() - t.start),
InitDuration: initDuration,
ResourceStats: resourceStats,
})
}

type statFn func() ResourceTraceStats

// BaseTracer implements [Tracer] methods as no-ops.
//
// It is intended to be composed with types that only need to implement a subset
// of Tracer methods.
//
// Example usage:
//
// // MyTracer only hooks AcquireEnd
// type MyTracer struct {
// pool.BaseTracer
// }
//
// func AcquireEnd(ctx context.Context, d AcquireEndData) {
// /* do something with d */
// }
type BaseTracer struct{}

func (BaseTracer) AcquireStart(ctx context.Context, _ AcquireStartData) context.Context {
return ctx
}
func (BaseTracer) AcquireEnd(context.Context, AcquireEndData) {}
func (BaseTracer) ReleaseStart(ctx context.Context, _ ReleaseStartData) context.Context {
return ctx
}
func (BaseTracer) ReleaseEnd(context.Context, ReleaseEndData) {}

var _ Tracer = BaseTracer{}