Skip to content

Commit 995c4d2

Browse files
authored
Topic consumer availability period (#1943)
1 parent 359cf5f commit 995c4d2

File tree

14 files changed

+216
-30
lines changed

14 files changed

+216
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
* Fixed deadlock in `Endpoint.String()` method
2+
* Added the `AvailabilityPeriod` to the Consumer type in topics
23

34
## v3.118.3
45
* Fixed `context` checking in `ydb.Open`

examples/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ require (
5858
github.com/sethvargo/go-retry v0.3.0 // indirect
5959
github.com/syndtr/goleveldb v1.0.0 // indirect
6060
github.com/yandex-cloud/go-genproto v0.0.0-20220815090733-4c139c0154e2 // indirect
61-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 // indirect
61+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633 // indirect
6262
github.com/ydb-platform/ydb-go-yc-metadata v0.6.1 // indirect
6363
go.uber.org/multierr v1.11.0 // indirect
6464
golang.org/x/crypto v0.36.0 // indirect

examples/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2043,8 +2043,8 @@ github.com/ydb-platform/gorm-driver v0.1.3 h1:uewwScbRuCixNPC0LF7gDKvWcB13/iLj76
20432043
github.com/ydb-platform/gorm-driver v0.1.3/go.mod h1:49cSoG5J18muQTiKj4StL2dHs1/dB94OitnHOvetK24=
20442044
github.com/ydb-platform/xorm v0.0.3 h1:MXk42lANB6r/MMLg/XdJfyXJycGUDlCeLiMlLGDKVPw=
20452045
github.com/ydb-platform/xorm v0.0.3/go.mod h1:hFsU7EUF0o3S+l5c0eyP2yPVjJ0d4gsFdqCsyazzwBc=
2046-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 h1:SKqSRP6/ocY2Z4twOqKEKxpmawVTHTvQiom7hrU6jt0=
2047-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
2046+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633 h1:xnHt1ratkQUO0m2/A4py3yMYZI5BOPoZZppbcdybR10=
2047+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
20482048
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0 h1:JxSvw+Moont8qCmibP2MjSEIHfkWJLkw0fHZemAk+d0=
20492049
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0/go.mod h1:YzCPoNrTbrXZg9bO2YkbjI6eQLkaRIE9Bq8ponu0g8A=
20502050
github.com/ydb-platform/ydb-go-sdk-prometheus/v2 v2.1.2 h1:/kDHhXMNGjsqy+SZ3Zn7gZ2ziZekUJLnPXqwy6vyAX8=

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/golang-jwt/jwt/v4 v4.5.2
77
github.com/google/uuid v1.6.0
88
github.com/jonboulle/clockwork v0.5.0
9-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9
9+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633
1010
go.uber.org/goleak v1.3.0
1111
go.uber.org/zap v1.27.0
1212
golang.org/x/sync v0.11.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
7676
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
7777
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
7878
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
79-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 h1:SKqSRP6/ocY2Z4twOqKEKxpmawVTHTvQiom7hrU6jt0=
80-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
79+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633 h1:xnHt1ratkQUO0m2/A4py3yMYZI5BOPoZZppbcdybR10=
80+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
8181
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
8282
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
8383
go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE=

internal/grpcwrapper/rawtopic/alter_topic.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rawtopic
22

33
import (
44
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
5+
"google.golang.org/protobuf/types/known/emptypb"
56

67
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawoptional"
78
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
@@ -68,19 +69,34 @@ func (r *AlterTopicResult) FromProto(proto *Ydb_Topic.AlterTopicResponse) error
6869
}
6970

7071
type AlterConsumer struct {
71-
Name string
72-
SetImportant rawoptional.Bool
73-
SetReadFrom rawoptional.Time
74-
SetSupportedCodecs rawtopiccommon.SupportedCodecs
75-
AlterAttributes map[string]string
72+
Name string
73+
SetImportant rawoptional.Bool
74+
SetReadFrom rawoptional.Time
75+
SetSupportedCodecs rawtopiccommon.SupportedCodecs
76+
SetAvailabilityPeriod rawoptional.Duration
77+
ResetAvailabilityPeriod bool
78+
AlterAttributes map[string]string
7679
}
7780

7881
func (c *AlterConsumer) ToProto() *Ydb_Topic.AlterConsumer {
79-
return &Ydb_Topic.AlterConsumer{
82+
res := &Ydb_Topic.AlterConsumer{
8083
Name: c.Name,
8184
SetImportant: c.SetImportant.ToProto(),
8285
SetReadFrom: c.SetReadFrom.ToProto(),
8386
SetSupportedCodecs: c.SetSupportedCodecs.ToProto(),
8487
AlterAttributes: c.AlterAttributes,
8588
}
89+
90+
if c.SetAvailabilityPeriod.HasValue {
91+
res.AvailabilityPeriodAction = &Ydb_Topic.AlterConsumer_SetAvailabilityPeriod{
92+
SetAvailabilityPeriod: c.SetAvailabilityPeriod.ToProto(),
93+
}
94+
}
95+
if c.ResetAvailabilityPeriod {
96+
res.AvailabilityPeriodAction = &Ydb_Topic.AlterConsumer_ResetAvailabilityPeriod{
97+
ResetAvailabilityPeriod: &emptypb.Empty{},
98+
}
99+
}
100+
101+
return res
86102
}

internal/grpcwrapper/rawtopic/controlplane_types.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,31 @@ var (
1717
)
1818

1919
type Consumer struct {
20-
Name string
21-
Important bool
22-
SupportedCodecs rawtopiccommon.SupportedCodecs
23-
ReadFrom rawoptional.Time
24-
Attributes map[string]string
20+
Name string
21+
Important bool
22+
SupportedCodecs rawtopiccommon.SupportedCodecs
23+
ReadFrom rawoptional.Time
24+
AvailabilityPeriod rawoptional.Duration
25+
Attributes map[string]string
2526
}
2627

2728
func (c *Consumer) MustFromProto(consumer *Ydb_Topic.Consumer) {
2829
c.Name = consumer.GetName()
2930
c.Important = consumer.GetImportant()
3031
c.Attributes = consumer.GetAttributes()
3132
c.ReadFrom.MustFromProto(consumer.GetReadFrom())
33+
c.AvailabilityPeriod.MustFromProto(consumer.GetAvailabilityPeriod())
3234
c.SupportedCodecs.MustFromProto(consumer.GetSupportedCodecs())
3335
}
3436

3537
func (c *Consumer) ToProto() *Ydb_Topic.Consumer {
3638
return &Ydb_Topic.Consumer{
37-
Name: c.Name,
38-
Important: c.Important,
39-
ReadFrom: c.ReadFrom.ToProto(),
40-
SupportedCodecs: c.SupportedCodecs.ToProto(),
41-
Attributes: c.Attributes,
39+
Name: c.Name,
40+
Important: c.Important,
41+
ReadFrom: c.ReadFrom.ToProto(),
42+
AvailabilityPeriod: c.AvailabilityPeriod.ToProto(),
43+
SupportedCodecs: c.SupportedCodecs.ToProto(),
44+
Attributes: c.Attributes,
4245
}
4346
}
4447

tests/slo/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.23.9
44

55
require (
66
github.com/prometheus/client_golang v1.14.0
7+
github.com/prometheus/client_model v0.6.0
78
github.com/ydb-platform/gorm-driver v0.1.3
89
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0
910
github.com/ydb-platform/ydb-go-sdk/v3 v3.67.0
@@ -28,12 +29,11 @@ require (
2829
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
2930
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3031
github.com/modern-go/reflect2 v1.0.2 // indirect
31-
github.com/prometheus/client_model v0.6.0 // indirect
3232
github.com/prometheus/common v0.37.0 // indirect
3333
github.com/prometheus/procfs v0.8.0 // indirect
3434
github.com/syndtr/goleveldb v1.0.0 // indirect
3535
github.com/yandex-cloud/go-genproto v0.0.0-20211115083454-9ca41db5ed9e // indirect
36-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 // indirect
36+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633 // indirect
3737
github.com/ydb-platform/ydb-go-yc v0.12.1 // indirect
3838
github.com/ydb-platform/ydb-go-yc-metadata v0.6.1 // indirect
3939
golang.org/x/net v0.38.0 // indirect

tests/slo/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,8 +2024,8 @@ github.com/ydb-platform/gorm-driver v0.1.3 h1:uewwScbRuCixNPC0LF7gDKvWcB13/iLj76
20242024
github.com/ydb-platform/gorm-driver v0.1.3/go.mod h1:49cSoG5J18muQTiKj4StL2dHs1/dB94OitnHOvetK24=
20252025
github.com/ydb-platform/xorm v0.0.3 h1:MXk42lANB6r/MMLg/XdJfyXJycGUDlCeLiMlLGDKVPw=
20262026
github.com/ydb-platform/xorm v0.0.3/go.mod h1:hFsU7EUF0o3S+l5c0eyP2yPVjJ0d4gsFdqCsyazzwBc=
2027-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9 h1:SKqSRP6/ocY2Z4twOqKEKxpmawVTHTvQiom7hrU6jt0=
2028-
github.com/ydb-platform/ydb-go-genproto v0.0.0-20250911135631-b3beddd517d9/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
2027+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633 h1:xnHt1ratkQUO0m2/A4py3yMYZI5BOPoZZppbcdybR10=
2028+
github.com/ydb-platform/ydb-go-genproto v0.0.0-20251124083616-76378b253633/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
20292029
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0 h1:JxSvw+Moont8qCmibP2MjSEIHfkWJLkw0fHZemAk+d0=
20302030
github.com/ydb-platform/ydb-go-sdk-auth-environ v0.3.0/go.mod h1:YzCPoNrTbrXZg9bO2YkbjI6eQLkaRIE9Bq8ponu0g8A=
20312031
github.com/ydb-platform/ydb-go-yc v0.12.1 h1:qw3Fa+T81+Kpu5Io2vYHJOwcrYrVjgJlT6t/0dOXJrA=

topic/example_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"log"
88
"os"
9+
"time"
910

1011
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
@@ -68,6 +69,73 @@ func Example_alterTopic() {
6869
}
6970
}
7071

72+
func Example_createTopicWithConsumerAvailabilityPeriod() {
73+
ctx := context.TODO()
74+
connectionString := os.Getenv("YDB_CONNECTION_STRING")
75+
if connectionString == "" {
76+
connectionString = "grpc://localhost:2136/local"
77+
}
78+
db, err := ydb.Open(ctx, connectionString)
79+
if err != nil {
80+
log.Printf("failed connect: %v", err)
81+
82+
return
83+
}
84+
defer db.Close(ctx) // cleanup resources
85+
86+
// Create topic with consumer that has 24-hour availability period
87+
// Messages for this consumer won't expire for at least 24 hours even if not committed
88+
availabilityPeriod := 24 * time.Hour
89+
err = db.Topic().Create(ctx, "topic-path",
90+
topicoptions.CreateWithConsumer(topictypes.Consumer{
91+
Name: "my-consumer",
92+
Important: true,
93+
AvailabilityPeriod: &availabilityPeriod, // Messages available for at least 24 hours
94+
SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip},
95+
}),
96+
)
97+
if err != nil {
98+
log.Printf("failed create topic: %v", err)
99+
100+
return
101+
}
102+
}
103+
104+
func Example_alterConsumerAvailabilityPeriod() {
105+
ctx := context.TODO()
106+
connectionString := os.Getenv("YDB_CONNECTION_STRING")
107+
if connectionString == "" {
108+
connectionString = "grpc://localhost:2136/local"
109+
}
110+
db, err := ydb.Open(ctx, connectionString)
111+
if err != nil {
112+
log.Printf("failed connect: %v", err)
113+
114+
return
115+
}
116+
defer db.Close(ctx) // cleanup resources
117+
118+
// Set availability period to 48 hours for existing consumer
119+
err = db.Topic().Alter(ctx, "topic-path",
120+
topicoptions.AlterConsumerWithAvailabilityPeriod("my-consumer", 48*time.Hour),
121+
)
122+
if err != nil {
123+
log.Printf("failed alter consumer availability period: %v", err)
124+
125+
return
126+
}
127+
128+
// Reset availability period to default value
129+
err = db.Topic().Alter(ctx, "topic-path",
130+
topicoptions.AlterConsumerResetAvailabilityPeriod("my-consumer"),
131+
)
132+
if err != nil {
133+
log.Printf("failed reset consumer availability period: %v", err)
134+
135+
return
136+
}
137+
}
138+
71139
func Example_describeTopic() {
72140
ctx := context.TODO()
73141
connectionString := os.Getenv("YDB_CONNECTION_STRING")

0 commit comments

Comments
 (0)