Skip to content

Commit 1b2f8d4

Browse files
authored
Merge pull request #1339 from ydb-platform/xsync-once
refactoring of xsync.OnceValue
2 parents 689b333 + 8f56f1b commit 1b2f8d4

File tree

3 files changed

+58
-44
lines changed

3 files changed

+58
-44
lines changed

driver.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -185,44 +185,44 @@ func (d *Driver) Secure() bool {
185185

186186
// Table returns table client
187187
func (d *Driver) Table() table.Client {
188-
return d.table.Get()
188+
return d.table.Must()
189189
}
190190

191191
// Query returns query client
192192
//
193193
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
194194
func (d *Driver) Query() *internalQuery.Client {
195-
return d.query.Get()
195+
return d.query.Must()
196196
}
197197

198198
// Scheme returns scheme client
199199
func (d *Driver) Scheme() scheme.Client {
200-
return d.scheme.Get()
200+
return d.scheme.Must()
201201
}
202202

203203
// Coordination returns coordination client
204204
func (d *Driver) Coordination() coordination.Client {
205-
return d.coordination.Get()
205+
return d.coordination.Must()
206206
}
207207

208208
// Ratelimiter returns ratelimiter client
209209
func (d *Driver) Ratelimiter() ratelimiter.Client {
210-
return d.ratelimiter.Get()
210+
return d.ratelimiter.Must()
211211
}
212212

213213
// Discovery returns discovery client
214214
func (d *Driver) Discovery() discovery.Client {
215-
return d.discovery.Get()
215+
return d.discovery.Must()
216216
}
217217

218218
// Scripting returns scripting client
219219
func (d *Driver) Scripting() scripting.Client {
220-
return d.scripting.Get()
220+
return d.scripting.Must()
221221
}
222222

223223
// Topic returns topic client
224224
func (d *Driver) Topic() topic.Client {
225-
return d.topic.Get()
225+
return d.topic.Must()
226226
}
227227

228228
// Open connects to database by DSN and return driver runtime holder
@@ -407,7 +407,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
407407
return xerrors.WithStackTrace(err)
408408
}
409409

410-
d.table = xsync.OnceValue(func() *internalTable.Client {
410+
d.table = xsync.OnceValue(func() (*internalTable.Client, error) {
411411
return internalTable.New(xcontext.ValueOnly(ctx),
412412
d.balancer,
413413
tableConfig.New(
@@ -419,10 +419,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
419419
d.tableOptions...,
420420
)...,
421421
),
422-
)
422+
), nil
423423
})
424424

425-
d.query = xsync.OnceValue(func() *internalQuery.Client {
425+
d.query = xsync.OnceValue(func() (*internalQuery.Client, error) {
426426
return internalQuery.New(xcontext.ValueOnly(ctx),
427427
d.balancer,
428428
queryConfig.New(
@@ -434,13 +434,13 @@ func (d *Driver) connect(ctx context.Context) (err error) {
434434
d.queryOptions...,
435435
)...,
436436
),
437-
)
437+
), nil
438438
})
439439
if err != nil {
440440
return xerrors.WithStackTrace(err)
441441
}
442442

443-
d.scheme = xsync.OnceValue(func() *internalScheme.Client {
443+
d.scheme = xsync.OnceValue(func() (*internalScheme.Client, error) {
444444
return internalScheme.New(xcontext.ValueOnly(ctx),
445445
d.balancer,
446446
schemeConfig.New(
@@ -453,10 +453,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
453453
d.schemeOptions...,
454454
)...,
455455
),
456-
)
456+
), nil
457457
})
458458

459-
d.coordination = xsync.OnceValue(func() *internalCoordination.Client {
459+
d.coordination = xsync.OnceValue(func() (*internalCoordination.Client, error) {
460460
return internalCoordination.New(xcontext.ValueOnly(ctx),
461461
d.balancer,
462462
coordinationConfig.New(
@@ -468,10 +468,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
468468
d.coordinationOptions...,
469469
)...,
470470
),
471-
)
471+
), nil
472472
})
473473

474-
d.ratelimiter = xsync.OnceValue(func() *internalRatelimiter.Client {
474+
d.ratelimiter = xsync.OnceValue(func() (*internalRatelimiter.Client, error) {
475475
return internalRatelimiter.New(xcontext.ValueOnly(ctx),
476476
d.balancer,
477477
ratelimiterConfig.New(
@@ -483,10 +483,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
483483
d.ratelimiterOptions...,
484484
)...,
485485
),
486-
)
486+
), nil
487487
})
488488

489-
d.discovery = xsync.OnceValue(func() *internalDiscovery.Client {
489+
d.discovery = xsync.OnceValue(func() (*internalDiscovery.Client, error) {
490490
return internalDiscovery.New(xcontext.ValueOnly(ctx),
491491
d.pool.Get(endpoint.New(d.config.Endpoint())),
492492
discoveryConfig.New(
@@ -502,10 +502,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
502502
d.discoveryOptions...,
503503
)...,
504504
),
505-
)
505+
), nil
506506
})
507507

508-
d.scripting = xsync.OnceValue(func() *internalScripting.Client {
508+
d.scripting = xsync.OnceValue(func() (*internalScripting.Client, error) {
509509
return internalScripting.New(xcontext.ValueOnly(ctx),
510510
d.balancer,
511511
scriptingConfig.New(
@@ -517,10 +517,10 @@ func (d *Driver) connect(ctx context.Context) (err error) {
517517
d.scriptingOptions...,
518518
)...,
519519
),
520-
)
520+
), nil
521521
})
522522

523-
d.topic = xsync.OnceValue(func() *topicclientinternal.Client {
523+
d.topic = xsync.OnceValue(func() (*topicclientinternal.Client, error) {
524524
return topicclientinternal.New(xcontext.ValueOnly(ctx),
525525
d.balancer,
526526
d.config.Credentials(),
@@ -532,7 +532,7 @@ func (d *Driver) connect(ctx context.Context) (err error) {
532532
},
533533
d.topicOptions...,
534534
)...,
535-
)
535+
), nil
536536
})
537537

538538
return nil

internal/xsync/once.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ func OnceFunc(f func(ctx context.Context) error) func(ctx context.Context) error
2020
}
2121

2222
type Once[T closer.Closer] struct {
23-
f func() T
23+
f func() (T, error)
2424
once sync.Once
2525
mutex sync.RWMutex
2626
t T
27+
err error
2728
}
2829

29-
func OnceValue[T closer.Closer](f func() T) *Once[T] {
30+
func OnceValue[T closer.Closer](f func() (T, error)) *Once[T] {
3031
return &Once[T]{f: f}
3132
}
3233

@@ -46,16 +47,25 @@ func (v *Once[T]) Close(ctx context.Context) (err error) {
4647
return nil
4748
}
4849

49-
func (v *Once[T]) Get() T {
50+
func (v *Once[T]) Get() (T, error) {
5051
v.once.Do(func() {
5152
v.mutex.Lock()
5253
defer v.mutex.Unlock()
5354

54-
v.t = v.f()
55+
v.t, v.err = v.f()
5556
})
5657

5758
v.mutex.RLock()
5859
defer v.mutex.RUnlock()
5960

60-
return v.t
61+
return v.t, v.err
62+
}
63+
64+
func (v *Once[T]) Must() T {
65+
t, err := v.Get()
66+
if err != nil {
67+
panic(err)
68+
}
69+
70+
return t
6171
}

internal/xsync/once_test.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,49 +45,53 @@ func TestOnceValue(t *testing.T) {
4545
ctx := xtest.Context(t)
4646
t.Run("Race", func(t *testing.T) {
4747
counter := 0
48-
once := OnceValue(func() *testCloser {
48+
once := OnceValue(func() (*testCloser, error) {
4949
counter++
5050

51-
return &testCloser{value: counter}
51+
return &testCloser{value: counter}, nil
5252
})
5353
var wg sync.WaitGroup
5454
wg.Add(1000)
5555
for range make([]struct{}, 1000) {
5656
go func() {
5757
defer wg.Done()
58-
v := once.Get()
58+
v, err := once.Get()
59+
require.NoError(t, err)
5960
require.Equal(t, 1, v.value)
6061
}()
6162
}
6263
wg.Wait()
6364
})
6465
t.Run("GetBeforeClose", func(t *testing.T) {
6566
constCloseErr := errors.New("")
66-
once := OnceValue(func() *testCloser {
67+
once := OnceValue(func() (*testCloser, error) {
6768
return &testCloser{
6869
inited: true,
6970
closeErr: constCloseErr,
70-
}
71+
}, nil
72+
})
73+
require.NotPanics(t, func() {
74+
v := once.Must()
75+
require.True(t, v.inited)
76+
require.False(t, v.closed)
77+
err := once.Close(ctx)
78+
require.ErrorIs(t, err, constCloseErr)
79+
require.True(t, v.inited)
80+
require.True(t, v.closed)
7181
})
72-
v := once.Get()
73-
require.True(t, v.inited)
74-
require.False(t, v.closed)
75-
err := once.Close(ctx)
76-
require.ErrorIs(t, err, constCloseErr)
77-
require.True(t, v.inited)
78-
require.True(t, v.closed)
7982
})
8083
t.Run("CloseBeforeGet", func(t *testing.T) {
8184
constCloseErr := errors.New("")
82-
once := OnceValue(func() *testCloser {
85+
once := OnceValue(func() (*testCloser, error) {
8386
return &testCloser{
8487
inited: true,
8588
closeErr: constCloseErr,
86-
}
89+
}, nil
8790
})
8891
err := once.Close(ctx)
8992
require.NoError(t, err)
90-
v := once.Get()
93+
v, err := once.Get()
94+
require.NoError(t, err)
9195
require.Nil(t, v)
9296
})
9397
}

0 commit comments

Comments
 (0)