Skip to content

Commit 5666cb0

Browse files
committed
node hinting workload
1 parent b6e2e39 commit 5666cb0

File tree

12 files changed

+852
-14
lines changed

12 files changed

+852
-14
lines changed

tests/slo/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ go 1.24.3
55
toolchain go1.24.10
66

77
require (
8+
github.com/prometheus/client_golang v1.3.0
9+
github.com/prometheus/common v0.7.0
810
github.com/ydb-platform/gorm-driver v0.1.3
911
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0
1012
github.com/ydb-platform/ydb-go-sdk/v3 v3.67.0

tests/slo/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1905,6 +1905,7 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
19051905
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
19061906
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
19071907
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
1908+
github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc=
19081909
github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
19091910
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
19101911
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
@@ -1918,6 +1919,7 @@ github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk
19181919
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
19191920
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
19201921
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
1922+
github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY=
19211923
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
19221924
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
19231925
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=

tests/slo/internal/config/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ type Config struct {
3434
Time int
3535
ShutdownTime int
3636

37-
BatchSize int
37+
BatchSize int
38+
PrometheusEndpoint string
3839
}
3940

4041
func New() (*Config, error) {
@@ -95,6 +96,7 @@ func New() (*Config, error) {
9596

9697
fs.StringVar(&cfg.OTLPEndpoint, "otlp-endpoint", "", "OTLP HTTP endpoint for metrics")
9798
fs.IntVar(&cfg.ReportPeriod, "report-period", 250, "metrics reporting period in milliseconds")
99+
fs.StringVar(&cfg.PrometheusEndpoint, "prometheus-endpoint", "", "Prometheus endpoint")
98100

99101
fs.IntVar(&cfg.ReadRPS, "read-rps", 1000, "read RPS")
100102
fs.IntVar(&cfg.WriteRPS, "write-rps", 100, "write RPS")

tests/slo/internal/generator/generator.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,22 @@ const (
1313
MaxLength = 40
1414
)
1515

16-
type Generator struct {
16+
type Generator interface {
17+
Generate() (Row, error)
18+
}
19+
20+
type GeneratorImpl struct {
1721
currentID RowID
1822
mu sync.Mutex
1923
}
2024

21-
func New(id RowID) *Generator {
22-
return &Generator{
25+
func New(id RowID) *GeneratorImpl {
26+
return &GeneratorImpl{
2327
currentID: id,
2428
}
2529
}
2630

27-
func (g *Generator) Generate() (Row, error) {
31+
func (g *GeneratorImpl) Generate() (Row, error) {
2832
g.mu.Lock()
2933
id := g.currentID
3034
g.currentID++
@@ -37,15 +41,15 @@ func (g *Generator) Generate() (Row, error) {
3741
}
3842

3943
var err error
40-
e.PayloadStr, err = g.genPayloadString()
44+
e.PayloadStr, err = genPayloadString()
4145
if err != nil {
4246
return Row{}, err
4347
}
4448

4549
return e, nil
4650
}
4751

48-
func (g *Generator) genPayloadString() (*string, error) {
52+
func genPayloadString() (*string, error) {
4953
l := MinLength + rand.Intn(MaxLength-MinLength+1) //nolint:gosec // speed more important
5054

5155
sl := make([]byte, l)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package generator
2+
3+
import (
4+
"math/rand"
5+
"time"
6+
)
7+
8+
type Range struct {
9+
Left uint64
10+
Right uint64
11+
}
12+
type SeededGenerator struct {
13+
rng *rand.Rand
14+
setRange *Range
15+
}
16+
17+
func NewSeeded(seed int64) *SeededGenerator {
18+
return &SeededGenerator{
19+
rng: rand.New(rand.NewSource(seed)),
20+
}
21+
}
22+
23+
func (g *SeededGenerator) ConstructRow() (Row, error) {
24+
e := Row{
25+
PayloadDouble: func(a float64) *float64 { return &a }(rand.Float64()), //nolint:gosec // speed more important
26+
PayloadTimestamp: func(a time.Time) *time.Time { return &a }(time.Now()),
27+
PayloadHash: func(a uint64) *uint64 { return &a }(rand.Uint64()), //nolint:gosec
28+
}
29+
30+
var err error
31+
e.PayloadStr, err = genPayloadString()
32+
if err != nil {
33+
return Row{}, err
34+
}
35+
36+
return e, nil
37+
}
38+
39+
func (g *SeededGenerator) Generate() (Row, error) {
40+
row, err := g.ConstructRow()
41+
if err != nil {
42+
return Row{}, err
43+
}
44+
if g.setRange == nil {
45+
row.ID = g.rng.Uint64()
46+
} else {
47+
row.ID = g.setRange.Left + g.rng.Uint64()%(g.setRange.Right-g.setRange.Left)
48+
}
49+
return row, nil
50+
}
51+
52+
func (g *SeededGenerator) SetRange(l uint64, r uint64) {
53+
g.setRange = &Range{
54+
Left: l,
55+
Right: r,
56+
}
57+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package node_hints
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"math/rand"
8+
"slices"
9+
"slo/internal/generator"
10+
"sync/atomic"
11+
"time"
12+
13+
"github.com/ydb-platform/ydb-go-sdk/v3"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
15+
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
16+
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
17+
)
18+
19+
func describeTable(ctx context.Context, driver *ydb.Driver, tableName string) (desc options.Description, err error) {
20+
err = driver.Table().Do(ctx,
21+
func(ctx context.Context, session table.Session) (err error) {
22+
desc, err = session.DescribeTable(ctx, tableName,
23+
options.WithTableStats(),
24+
options.WithPartitionStats(),
25+
options.WithShardKeyBounds(),
26+
options.WithShardNodesInfo(),
27+
)
28+
return err
29+
},
30+
table.WithIdempotent(),
31+
)
32+
return desc, err
33+
}
34+
35+
type NodeSelector struct {
36+
LowerBounds []uint64
37+
UpperBounds []uint64
38+
NodeIDs []uint32
39+
}
40+
41+
func extractKey(v types.Value, side int) (uint64, error) {
42+
if types.IsNull(v) {
43+
if side == LEFT {
44+
return 0, nil
45+
} else {
46+
return ^uint64(0), nil
47+
}
48+
}
49+
parts, err := types.TupleItems(v)
50+
if err != nil {
51+
return 0, fmt.Errorf("extract tuple: %w", err)
52+
}
53+
54+
var res uint64
55+
if err := types.CastTo(parts[0], &res); err != nil {
56+
return 0, fmt.Errorf("cast to uint64: %w", err)
57+
}
58+
59+
return res, nil
60+
}
61+
62+
const (
63+
LEFT = iota
64+
RIGHT = iota
65+
)
66+
67+
func MakeNodeSelector(ctx context.Context, driver *ydb.Driver, tableName string) (*NodeSelector, error) {
68+
dsc, err := describeTable(ctx, driver, tableName)
69+
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
s := NodeSelector{}
75+
76+
for _, kr := range dsc.KeyRanges {
77+
l, err := extractKey(kr.From, LEFT)
78+
if err != nil {
79+
return nil, err
80+
}
81+
s.LowerBounds = append(s.LowerBounds, l)
82+
r, err := extractKey(kr.To, RIGHT)
83+
if err != nil {
84+
return nil, err
85+
}
86+
s.UpperBounds = append(s.UpperBounds, r)
87+
}
88+
89+
for i := range len(s.UpperBounds) - 1 {
90+
if s.UpperBounds[i] >= s.UpperBounds[i+1] {
91+
for _, b := range s.UpperBounds {
92+
log.Println(b)
93+
}
94+
log.Fatalf("boundaries are not sorted")
95+
}
96+
}
97+
98+
for _, ps := range dsc.Stats.PartitionStats {
99+
s.NodeIDs = append(s.NodeIDs, ps.LeaderNodeID)
100+
}
101+
return &s, nil
102+
}
103+
104+
func (s *NodeSelector) findNodeID(key uint64) uint32 {
105+
idx, found := slices.BinarySearch(s.UpperBounds, key)
106+
if found {
107+
idx++
108+
}
109+
return s.NodeIDs[idx]
110+
}
111+
112+
func (s *NodeSelector) WithNodeHint(ctx context.Context, key uint64) context.Context {
113+
if s == nil || len(s.NodeIDs) == 0 {
114+
return ctx
115+
}
116+
return ydb.WithPreferredNodeID(ctx, s.findNodeID(key))
117+
}
118+
119+
func (s *NodeSelector) GeneratePartitionKey(partitionId uint64) uint64 {
120+
l := s.UpperBounds[partitionId] - s.LowerBounds[partitionId]
121+
return s.LowerBounds[partitionId] + rand.Uint64()%l
122+
}
123+
124+
func RunUpdates(ctx context.Context, driver *ydb.Driver, tableName string, frequency time.Duration) (*atomic.Pointer[NodeSelector], error) {
125+
var ns atomic.Pointer[NodeSelector]
126+
updateSelector := func() error {
127+
selector, err := MakeNodeSelector(ctx, driver, tableName)
128+
if err != nil {
129+
return err
130+
}
131+
ns.Store(selector)
132+
return nil
133+
}
134+
135+
err := updateSelector()
136+
if err != nil {
137+
return nil, err
138+
} else {
139+
ticker := time.NewTicker(frequency)
140+
go func() {
141+
defer ticker.Stop()
142+
for {
143+
select {
144+
case <-ctx.Done():
145+
return
146+
case <-ticker.C:
147+
err = updateSelector()
148+
if err != nil {
149+
log.Printf("node hints update error: %v\n", err)
150+
}
151+
}
152+
}
153+
}()
154+
}
155+
return &ns, nil
156+
}
157+
158+
func (ns *NodeSelector) GetRandomNodeID(generator generator.Generator) (int, uint32) {
159+
r, err := generator.Generate()
160+
if err != nil {
161+
log.Panicf("GetRandomNodeID: generator.Generate failed: %v", err)
162+
}
163+
shift := r.ID % uint64(len(ns.NodeIDs))
164+
for id, nodeID := range ns.NodeIDs {
165+
if id == int(shift) {
166+
return id, nodeID
167+
}
168+
}
169+
log.Panicf("GetRandomNodeID: no nodeID found for shift: %d", shift)
170+
return 0, 0
171+
}

tests/slo/internal/workers/read.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,43 @@ func (w *Workers) Read(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter
3333
}
3434
}
3535

36+
func (w *Workers) ReadID() uint64 {
37+
if w.Gen == nil {
38+
return uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint: gosec
39+
}
40+
row, err := w.Gen.Generate()
41+
if err != nil {
42+
log.Panicf("generate error: %v", err)
43+
}
44+
return row.ID
45+
}
46+
47+
func (w *Workers) ReadIDs() []uint64 {
48+
ids := make([]uint64, 0, w.cfg.BatchSize)
49+
for range w.cfg.BatchSize {
50+
if w.Gen == nil {
51+
ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint: gosec
52+
} else {
53+
row, err := w.Gen.Generate()
54+
if err != nil {
55+
log.Panicf("generate error: %v", err)
56+
}
57+
ids = append(ids, row.ID)
58+
}
59+
}
60+
return ids
61+
}
62+
3663
func (w *Workers) read(ctx context.Context) error {
3764
var m metrics.Span
3865
var attempts int
3966
var err error
4067
if w.s != nil {
41-
id := uint64(rand.Intn(int(w.cfg.InitialDataCount))) //nolint:gosec // speed more important
68+
id := w.ReadID()
4269
m = w.m.Start(metrics.OperationTypeRead)
4370
_, attempts, err = w.s.Read(ctx, id)
4471
} else {
45-
ids := make([]uint64, 0, w.cfg.BatchSize)
46-
for range w.cfg.BatchSize {
47-
ids = append(ids, uint64(rand.Intn(int(w.cfg.InitialDataCount)))) //nolint:gosec
48-
}
72+
ids := w.ReadIDs()
4973
m = w.m.Start(metrics.OperationTypeRead)
5074
_, attempts, err = w.sb.ReadBatch(ctx, ids)
5175
}

tests/slo/internal/workers/workers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Workers struct {
2424
s ReadWriter
2525
sb BatchReadWriter
2626
m *metrics.Metrics
27+
Gen generator.Generator
2728
}
2829

2930
func New(cfg *config.Config, s ReadWriter, ref, label, jobName string) (*Workers, error) {

tests/slo/internal/workers/write.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"slo/internal/metrics"
1212
)
1313

14-
func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter, gen *generator.Generator) {
14+
func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limiter, gen generator.Generator) {
1515
defer wg.Done()
1616
for {
1717
select {
@@ -33,7 +33,7 @@ func (w *Workers) Write(ctx context.Context, wg *sync.WaitGroup, rl *rate.Limite
3333
}
3434
}
3535

36-
func (w *Workers) write(ctx context.Context, gen *generator.Generator) (finalErr error) {
36+
func (w *Workers) write(ctx context.Context, gen generator.Generator) (finalErr error) {
3737
m := w.m.Start(metrics.OperationTypeWrite)
3838
var attempts int
3939
if w.s != nil {

0 commit comments

Comments
 (0)