From 759dc73bbbace45a4e8738d8fff1bd62aadc0084 Mon Sep 17 00:00:00 2001 From: Andy Walker Date: Sat, 26 Apr 2025 02:11:46 -0400 Subject: [PATCH 1/5] WIP: tracing --- pool.go | 40 +++++++++++++++++++ pool_test.go | 44 ++++++++++++++++++-- tracing.go | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 192 insertions(+), 3 deletions(-) create mode 100644 tracing.go diff --git a/pool.go b/pool.go index c411d2f..2b2546b 100644 --- a/pool.go +++ b/pool.go @@ -40,6 +40,7 @@ type Resource[T any] struct { lastUsedNano int64 poolResetCount int status byte + traceCtx context.Context } // Value returns the resource value. @@ -114,6 +115,18 @@ func (res *Resource[T]) IdleDuration() time.Duration { return time.Duration(nanotime() - res.lastUsedNano) } +type ResourceTraceStats struct { + CreationTime time.Time + IdleDuration time.Duration +} + +func (res *Resource[T]) traceStats() ResourceTraceStats { + return ResourceTraceStats{ + CreationTime: res.CreationTime(), + IdleDuration: res.IdleDuration(), + } +} + // Pool is a concurrency-safe resource pool. type Pool[T any] struct { // mux is the pool internal lock. Any modification of shared state of @@ -135,6 +148,7 @@ type Pool[T any] struct { constructor Constructor[T] destructor Destructor[T] maxSize int32 + tracer Tracer acquireCount int64 acquireDuration time.Duration @@ -153,6 +167,7 @@ type Config[T any] struct { Constructor Constructor[T] Destructor Destructor[T] MaxSize int32 + Tracer Tracer } // NewPool creates a new pool. Returns an error iff MaxSize is less than 1. @@ -167,6 +182,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), idleResources: genstack.NewGenStack[*Resource[T]](), maxSize: config.MaxSize, + tracer: config.Tracer, constructor: config.Constructor, destructor: config.Destructor, baseAcquireCtx: baseAcquireCtx, @@ -352,13 +368,24 @@ func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) { // // This function exists solely only for benchmarking purposes. func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { + var trace tracerSpan startNano := nanotime() + if p.tracer != nil { + ctx = p.tracer.AcquireStart(ctx, AcquireStartData{ + StartNano: startNano, + }) + trace = tracerSpan{ + t: p.tracer, + start: startNano, + } + } var waitedForLock bool if !p.acquireSem.TryAcquire(1) { waitedForLock = true err := p.acquireSem.Acquire(ctx, 1) if err != nil { + defer trace.acquireEndErr(ctx, err) p.canceledAcquireCount.Add(1) return nil, err } @@ -366,6 +393,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { p.mux.Lock() if p.closed { + defer trace.acquireEndErr(ctx, ErrClosedPool) p.acquireSem.Release(1) p.mux.Unlock() return nil, ErrClosedPool @@ -375,8 +403,14 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { if res := p.tryAcquireIdleResource(); res != nil { waitTime := time.Duration(nanotime() - startNano) if waitedForLock { + defer trace.acquireEnd(ctx, waitTime, res.traceStats, false) p.emptyAcquireCount += 1 p.emptyAcquireWaitTime += waitTime + } else { + // TODO(draft): this else block is an artifact of waitTime being + // tracked separately as a metric. If this PR ends up removing + // metrics altogether, this will be a bit simpler. + defer trace.acquireEnd(ctx, 0, res.traceStats, false) } p.acquireCount += 1 p.acquireDuration += waitTime @@ -395,8 +429,14 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { res, err := p.initResourceValue(ctx, res) if err != nil { + defer trace.acquireEndErr(ctx, err) return nil, err } + if waitedForLock { + defer trace.acquireEnd(ctx, time.Duration(nanotime()-startNano), res.traceStats, true) + } else { + defer trace.acquireEnd(ctx, 0, res.traceStats, true) + } p.mux.Lock() defer p.mux.Unlock() diff --git a/pool_test.go b/pool_test.go index a313568..54543b2 100644 --- a/pool_test.go +++ b/pool_test.go @@ -20,6 +20,31 @@ import ( "golang.org/x/sync/semaphore" ) +// TODO(draft) This is just to demo. +// Will do testable example for final PR. +type testLogTracer struct { + puddle.BaseTracer + t *testing.T +} + +func (t *testLogTracer) Logf(format string, a ...any) { + t.t.Helper() + t.t.Logf(format, a...) +} + +func (t *testLogTracer) AcquireEnd(ctx context.Context, data puddle.AcquireEndData) { + t.t.Helper() + if data.Err != nil { + t.Logf("Acquire error after %v: %v", data.AcquireDuration, data.Err) + return + } + if data.InitDuration > 0 { + t.Logf("got new resource after %v. blocked for %v, init took: %v", data.AcquireDuration, data.WaitDuration, data.InitDuration) + return + } + t.Logf("got pooled resource after %v. blocked for %v, resource is %v old and has been idle for %v", data.AcquireDuration, data.WaitDuration, time.Since(data.ResourceStats.CreationTime), data.ResourceStats.IdleDuration) +} + type Counter struct { mutex sync.Mutex n int @@ -218,7 +243,14 @@ func TestPoolAcquireCreatesResourceRespectingContext(t *testing.T) { func TestPoolAcquireReusesResources(t *testing.T) { constructor, createCounter := createConstructor() - pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10}) + pool, err := puddle.NewPool(&puddle.Config[int]{ + Constructor: constructor, + Destructor: stubDestructor, + MaxSize: 10, + Tracer: &testLogTracer{ + t: t, + }, + }) require.NoError(t, err) res, err := pool.Acquire(context.Background()) @@ -328,7 +360,14 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) { } return constructorCalls.Next(), nil } - pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10}) + pool, err := puddle.NewPool(&puddle.Config[int]{ + Constructor: constructor, + Destructor: stubDestructor, + MaxSize: 10, + Tracer: &testLogTracer{ + t: t, + }, + }) require.NoError(t, err) res, err := pool.Acquire(ctx) @@ -1173,7 +1212,6 @@ func startAcceptOnceDummyServer(laddr string) { } } }() - } func ExamplePool() { diff --git a/tracing.go b/tracing.go new file mode 100644 index 0000000..dbdf0fd --- /dev/null +++ b/tracing.go @@ -0,0 +1,111 @@ +package puddle + +import ( + "context" + "time" +) + +type traceCtxKey int + +const ( + _ traceCtxKey = iota + acquireSpan + releaseSpan +) + +// Tracer traces pool actions. +type Tracer interface { + // AcquireStart is called at the beginning of Acquire calls. The return + // context is used for the rest of the call and will be passed to AcquireEnd + AcquireStart(ctx context.Context, data AcquireStartData) context.Context + AcquireEnd(ctx context.Context, data AcquireEndData) + // ReleaseStart is called at the beginning of Release calls. The return + // context is used for the rest of the call and will be passed to AcquireEnd + ReleaseStart(ctx context.Context, data ReleaseStartData) context.Context + ReleaseEnd(ctx context.Context, data ReleaseEndData) +} + +type AcquireStartData struct { + StartNano int64 +} + +type AcquireEndData struct { + WaitDuration time.Duration + AcquireDuration time.Duration + InitDuration time.Duration + ResourceStats ResourceTraceStats + Err error +} + +type ReleaseTracer interface{} + +type ReleaseStartData struct { + HeldDuration time.Duration +} + +type ReleaseEndData struct { + BlockDuration time.Duration + Err error +} + +type tracerSpan struct { + t Tracer + start int64 +} + +func (t tracerSpan) acquireEndErr(ctx context.Context, err error) { + if t.t == nil { + return + } + t.t.AcquireEnd(ctx, AcquireEndData{ + AcquireDuration: time.Duration(nanotime() - t.start), + Err: err, + }) +} + +func (t tracerSpan) acquireEnd(ctx context.Context, waitTime time.Duration, stats statFn, isNew bool) { + if t.t == nil { + return + } + var initDuration time.Duration + resourceStats := stats() + if isNew { + initDuration = time.Since(resourceStats.CreationTime) + } + t.t.AcquireEnd(ctx, AcquireEndData{ + WaitDuration: waitTime, + AcquireDuration: time.Duration(nanotime() - t.start), + InitDuration: initDuration, + ResourceStats: resourceStats, + }) +} + +type statFn func() ResourceTraceStats + +// BaseTracer implements [Tracer] methods as no-ops. +// +// It is intended to be composed with types that only need to implement a subset +// of Tracer methods. +// +// Example usage: +// +// // MyTracer only hooks AcquireEnd +// type MyTracer struct { +// pool.BaseTracer +// } +// +// func AcquireEnd(ctx context.Context, d AcquireEndData) { +// /* do something with d */ +// } +type BaseTracer struct{} + +func (BaseTracer) AcquireStart(ctx context.Context, _ AcquireStartData) context.Context { + return ctx +} +func (BaseTracer) AcquireEnd(context.Context, AcquireEndData) {} +func (BaseTracer) ReleaseStart(ctx context.Context, _ ReleaseStartData) context.Context { + return ctx +} +func (BaseTracer) ReleaseEnd(context.Context, ReleaseEndData) {} + +var _ Tracer = BaseTracer{} From 395c859c3d4717db8c34bcb1d3472117094418de Mon Sep 17 00:00:00 2001 From: Andy Walker Date: Sat, 26 Apr 2025 02:34:12 -0400 Subject: [PATCH 2/5] remove dead code around context.Value - Might need something similar for tracing resource lifecycle stuff, since there's no context integration into Release() et al. --- pool.go | 1 - tracing.go | 8 -------- 2 files changed, 9 deletions(-) diff --git a/pool.go b/pool.go index 2b2546b..1666c3a 100644 --- a/pool.go +++ b/pool.go @@ -40,7 +40,6 @@ type Resource[T any] struct { lastUsedNano int64 poolResetCount int status byte - traceCtx context.Context } // Value returns the resource value. diff --git a/tracing.go b/tracing.go index dbdf0fd..ea82227 100644 --- a/tracing.go +++ b/tracing.go @@ -5,14 +5,6 @@ import ( "time" ) -type traceCtxKey int - -const ( - _ traceCtxKey = iota - acquireSpan - releaseSpan -) - // Tracer traces pool actions. type Tracer interface { // AcquireStart is called at the beginning of Acquire calls. The return From f26e80021ba2c04b3dbdaf3444106cf53ad65b9b Mon Sep 17 00:00:00 2001 From: Andy Walker Date: Mon, 28 Apr 2025 16:01:03 -0400 Subject: [PATCH 3/5] doc: fix ReleaseEnd name --- tracing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tracing.go b/tracing.go index ea82227..afc9352 100644 --- a/tracing.go +++ b/tracing.go @@ -12,7 +12,7 @@ type Tracer interface { AcquireStart(ctx context.Context, data AcquireStartData) context.Context AcquireEnd(ctx context.Context, data AcquireEndData) // ReleaseStart is called at the beginning of Release calls. The return - // context is used for the rest of the call and will be passed to AcquireEnd + // context is used for the rest of the call and will be passed to ReleaseEnd ReleaseStart(ctx context.Context, data ReleaseStartData) context.Context ReleaseEnd(ctx context.Context, data ReleaseEndData) } From 535c347c22c13f336fb70eebae582ff0848158fb Mon Sep 17 00:00:00 2001 From: Andy Walker Date: Mon, 28 Apr 2025 16:01:29 -0400 Subject: [PATCH 4/5] chore: remove unused ReleaseTracer iface --- tracing.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/tracing.go b/tracing.go index afc9352..84b2e4a 100644 --- a/tracing.go +++ b/tracing.go @@ -29,8 +29,6 @@ type AcquireEndData struct { Err error } -type ReleaseTracer interface{} - type ReleaseStartData struct { HeldDuration time.Duration } From bccc38f8afd7268affa4e6d58c0c703e0749b04e Mon Sep 17 00:00:00 2001 From: Andy Walker Date: Mon, 28 Apr 2025 16:02:57 -0400 Subject: [PATCH 5/5] reorg: declare waitTime in main function body this change reduces the need to have two separate calls fo trace.acquireEnd for each instance (warm vs. new resource) --- pool.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/pool.go b/pool.go index 1666c3a..245ad26 100644 --- a/pool.go +++ b/pool.go @@ -380,6 +380,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { } var waitedForLock bool + var waitTime time.Duration if !p.acquireSem.TryAcquire(1) { waitedForLock = true err := p.acquireSem.Acquire(ctx, 1) @@ -400,17 +401,12 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { // If a resource is available in the pool. if res := p.tryAcquireIdleResource(); res != nil { - waitTime := time.Duration(nanotime() - startNano) + waitTime = time.Duration(nanotime() - startNano) if waitedForLock { - defer trace.acquireEnd(ctx, waitTime, res.traceStats, false) p.emptyAcquireCount += 1 p.emptyAcquireWaitTime += waitTime - } else { - // TODO(draft): this else block is an artifact of waitTime being - // tracked separately as a metric. If this PR ends up removing - // metrics altogether, this will be a bit simpler. - defer trace.acquireEnd(ctx, 0, res.traceStats, false) } + defer trace.acquireEnd(ctx, waitTime, res.traceStats, false) p.acquireCount += 1 p.acquireDuration += waitTime p.mux.Unlock() @@ -431,18 +427,14 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { defer trace.acquireEndErr(ctx, err) return nil, err } - if waitedForLock { - defer trace.acquireEnd(ctx, time.Duration(nanotime()-startNano), res.traceStats, true) - } else { - defer trace.acquireEnd(ctx, 0, res.traceStats, true) - } p.mux.Lock() defer p.mux.Unlock() p.emptyAcquireCount += 1 p.acquireCount += 1 - waitTime := time.Duration(nanotime() - startNano) + waitTime = time.Duration(nanotime() - startNano) + defer trace.acquireEnd(ctx, waitTime, res.traceStats, true) p.acquireDuration += waitTime p.emptyAcquireWaitTime += waitTime