Skip to content

Commit 02f9fc9

Browse files
authored
Node hint traces for session pool (#1976)
1 parent 2bd7589 commit 02f9fc9

File tree

13 files changed

+127
-33
lines changed

13 files changed

+127
-33
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Added `trace.NodeHintInfo` field for OnPoolGet trace callback which stores info for node hint misses
2+
* Added `ydb_go_sdk_ydb_table_pool_node_hint_miss` and `ydb_go_sdk_ydb_query_pool_node_hint_miss` metrics for node hint misses
3+
14
## v3.121.1
25
* Added support for `Timestamp64` type in `value.Any` converter
36
* Masked the sensitive credential data in the connection string (DSN, data source name) from error messages for security reasons

internal/pool/pool.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xlist"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2121
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
22+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
2223
)
2324

2425
type (
@@ -771,9 +772,10 @@ func needCloseItem[PT ItemConstraint[T], T any](c *Config[PT, T], info itemInfo[
771772

772773
func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { //nolint:funlen
773774
var (
774-
start = p.config.clock.Now()
775-
attempt int
776-
lastErr error
775+
start = p.config.clock.Now()
776+
attempt int
777+
lastErr error
778+
nodeHintInfo *trace.NodeHintInfo
777779
)
778780

779781
if onGet := p.config.trace.OnGet; onGet != nil {
@@ -782,7 +784,7 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
782784
)
783785
if onDone != nil {
784786
defer func() {
785-
onDone(item, attempt, finalErr)
787+
onDone(item, attempt, nodeHintInfo, finalErr)
786788
}()
787789
}
788790
}
@@ -809,7 +811,15 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
809811
}
810812
}
811813

812-
return p.removeFirstIdle()
814+
idle := p.removeFirstIdle()
815+
if hasPreferredNodeID {
816+
nodeHintInfo = &trace.NodeHintInfo{
817+
PreferredNodeID: preferredNodeID,
818+
SessionNodeID: idle.NodeID(),
819+
}
820+
}
821+
822+
return idle
813823
}); item != nil {
814824
if item.IsAlive() {
815825
info := xsync.WithLock(&p.mu, func() itemInfo[PT, T] {

internal/pool/pool_test.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
xtest "github.com/ydb-platform/ydb-go-sdk/v3/pkg/xtest"
2828
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
2929
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
30+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
3031
)
3132

3233
type (
@@ -67,8 +68,13 @@ var defaultTrace = &Trace{
6768
return func(err error) {
6869
}
6970
},
70-
OnGet: func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) {
71-
return func(item any, attempts int, err error) {
71+
OnGet: func(ctx *context.Context, call stack.Caller) func(
72+
item any,
73+
attempts int,
74+
nodeHintInfo *trace.NodeHintInfo,
75+
err error,
76+
) {
77+
return func(item any, attempts int, nodeHintInfo *trace.NodeHintInfo, err error) {
7278
}
7379
},
7480
onWait: func() func(item any, err error) {
@@ -183,6 +189,22 @@ func TestPool(t *testing.T) { //nolint:gocyclo
183189
require.NoError(t, err)
184190
})
185191
t.Run("RequireNodeIdFromPool", func(t *testing.T) {
192+
hintTrace := defaultTrace
193+
var preferredID uint32
194+
var sessionID uint32
195+
hintTrace.OnGet = func(ctx *context.Context, call stack.Caller) func(
196+
item any,
197+
attempts int,
198+
nodeHintInfo *trace.NodeHintInfo,
199+
err error,
200+
) {
201+
return func(item any, attempts int, nodeHintInfo *trace.NodeHintInfo, err error) {
202+
if nodeHintInfo != nil {
203+
preferredID = nodeHintInfo.PreferredNodeID
204+
sessionID = nodeHintInfo.SessionNodeID
205+
}
206+
}
207+
}
186208
nextNodeID := uint32(0)
187209
var newItemCalled uint32
188210
p := New[*testItem, testItem](rootCtx,
@@ -201,6 +223,7 @@ func TestPool(t *testing.T) { //nolint:gocyclo
201223

202224
return &v, nil
203225
}),
226+
WithLimit[*testItem, testItem](3),
204227
)
205228

206229
item := mustGetItem(t, p)
@@ -262,7 +285,10 @@ func TestPool(t *testing.T) { //nolint:gocyclo
262285
mustPutItem(t, p, item)
263286
mustPutItem(t, p, item2)
264287
mustPutItem(t, p, item3)
265-
288+
item, err = p.getItem(endpoint.WithNodeID(context.Background(), 100))
289+
require.NoError(t, err)
290+
require.EqualValues(t, 100, preferredID)
291+
require.EqualValues(t, item.NodeID(), sessionID)
266292
require.EqualValues(t, 3, newItemCalled)
267293
})
268294
t.Run("CreateItemOnGivenNode", func(t *testing.T) {
@@ -471,7 +497,12 @@ func TestPool(t *testing.T) { //nolint:gocyclo
471497
mustGetItem(t, p)
472498

473499
go func() {
474-
p.config.trace.OnGet = func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) {
500+
p.config.trace.OnGet = func(ctx *context.Context, call stack.Caller) func(
501+
item any,
502+
attempts int,
503+
_ *trace.NodeHintInfo,
504+
err error,
505+
) {
475506
get <- struct{}{}
476507

477508
return nil

internal/pool/trace.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,22 @@ import (
44
"context"
55

66
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
7+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
78
)
89

910
type (
1011
Trace struct {
11-
OnNew func(ctx *context.Context, call stack.Caller) func(limit int)
12-
OnClose func(ctx *context.Context, call stack.Caller) func(err error)
13-
OnTry func(ctx *context.Context, call stack.Caller) func(err error)
14-
OnWith func(ctx *context.Context, call stack.Caller) func(attempts int, err error)
15-
OnPut func(ctx *context.Context, call stack.Caller, item any) func(err error)
16-
OnGet func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error)
12+
OnNew func(ctx *context.Context, call stack.Caller) func(limit int)
13+
OnClose func(ctx *context.Context, call stack.Caller) func(err error)
14+
OnTry func(ctx *context.Context, call stack.Caller) func(err error)
15+
OnWith func(ctx *context.Context, call stack.Caller) func(attempts int, err error)
16+
OnPut func(ctx *context.Context, call stack.Caller, item any) func(err error)
17+
OnGet func(ctx *context.Context, call stack.Caller) func(
18+
item any,
19+
attempts int,
20+
nodeHintInfo *trace.NodeHintInfo,
21+
err error,
22+
)
1723
onWait func() func(item any, err error)
1824
OnChange func(Stats)
1925
}

internal/query/client.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -685,11 +685,16 @@ func poolTrace(t *trace.Query) *pool.Trace {
685685
onDone(err)
686686
}
687687
},
688-
OnGet: func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) {
688+
OnGet: func(ctx *context.Context, call stack.Caller) func(
689+
item any,
690+
attempts int,
691+
hint *trace.NodeHintInfo,
692+
err error,
693+
) {
689694
onDone := trace.QueryOnPoolGet(t, ctx, call)
690695

691-
return func(item any, attempts int, err error) {
692-
onDone(item.(*Session), attempts, err) //nolint:forcetypeassert
696+
return func(item any, attempts int, hint *trace.NodeHintInfo, err error) {
697+
onDone(item.(*Session), attempts, hint, err) //nolint:forcetypeassert
693698
}
694699
},
695700
OnChange: func(stats pool.Stats) {

internal/table/client.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,16 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
7878
onDone(err)
7979
}
8080
},
81-
OnGet: func(ctx *context.Context, call stack.Caller) func(item any, attempts int, err error) {
81+
OnGet: func(ctx *context.Context, call stack.Caller) func(
82+
item any,
83+
attempts int,
84+
hintInfo *trace.NodeHintInfo,
85+
err error,
86+
) {
8287
onDone := trace.TableOnPoolGet(config.Trace(), ctx, call)
8388

84-
return func(item any, attempts int, err error) {
85-
onDone(item.(*Session), attempts, err) //nolint:forcetypeassert
89+
return func(item any, attempts int, hintInfo *trace.NodeHintInfo, err error) {
90+
onDone(item.(*Session), attempts, hintInfo, err) //nolint:forcetypeassert
8691
}
8792
},
8893
OnWith: func(ctx *context.Context, call stack.Caller) func(attempts int, err error) {

metrics/query.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,21 @@ func query(config Config) (t trace.Query) {
3333
}
3434
}
3535
}
36+
{
37+
nodeHintMiss := poolConfig.CounterVec("node_hint_miss", "preferred_node_id", "session_node_id")
38+
t.OnPoolGet = func(info trace.QueryPoolGetStartInfo) func(trace.QueryPoolGetDoneInfo) {
39+
return func(info trace.QueryPoolGetDoneInfo) {
40+
if poolConfig.Details()&trace.QueryPoolEvents != 0 {
41+
if info.NodeHintInfo != nil {
42+
nodeHintMiss.With(map[string]string{
43+
"preferred_node_id": idToString(info.NodeHintInfo.PreferredNodeID),
44+
"session_node_id": idToString(info.NodeHintInfo.SessionNodeID),
45+
}).Inc()
46+
}
47+
}
48+
}
49+
}
50+
}
3651
{
3752
sizeConfig := poolConfig.WithSystem("size")
3853
limit := sizeConfig.GaugeVec("limit")

metrics/table.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func table(config Config) (t trace.Table) {
2020
get := config.CounterVec("get")
2121
put := config.CounterVec("put")
2222
with := config.GaugeVec("with")
23+
nodeHintMiss := config.CounterVec("node_hint_miss", "preferred_node_id", "session_node_id")
2324
t.OnInit = func(info trace.TableInitStartInfo) func(trace.TableInitDoneInfo) {
2425
return func(info trace.TableInitDoneInfo) {
2526
if config.Details()&trace.TableEvents != 0 {
@@ -58,8 +59,16 @@ func table(config Config) (t trace.Table) {
5859
}
5960
t.OnPoolGet = func(info trace.TablePoolGetStartInfo) func(trace.TablePoolGetDoneInfo) {
6061
return func(info trace.TablePoolGetDoneInfo) {
61-
if info.Error == nil && config.Details()&trace.TablePoolEvents != 0 {
62-
get.With(nil).Inc()
62+
if config.Details()&trace.TablePoolEvents != 0 {
63+
if info.Error == nil {
64+
get.With(nil).Inc()
65+
}
66+
if info.NodeHintInfo != nil {
67+
nodeHintMiss.With(map[string]string{
68+
"preferred_node_id": idToString(info.NodeHintInfo.PreferredNodeID),
69+
"session_node_id": idToString(info.NodeHintInfo.SessionNodeID),
70+
}).Inc()
71+
}
6372
}
6473
}
6574
}

trace/node_hint_info.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package trace
2+
3+
type NodeHintInfo struct {
4+
PreferredNodeID uint32
5+
SessionNodeID uint32
6+
}

trace/query.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -591,9 +591,10 @@ type (
591591
}
592592
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
593593
QueryPoolGetDoneInfo struct {
594-
Session sessionInfo
595-
Attempts int
596-
Error error
594+
Session sessionInfo
595+
Attempts int
596+
NodeHintInfo *NodeHintInfo
597+
Error error
597598
}
598599
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
599600
QueryPoolChange struct {

0 commit comments

Comments
 (0)