Skip to content

Commit c21b7a5

Browse files
committed
describe topic partition stats
1 parent 2b1698d commit c21b7a5

File tree

6 files changed

+153
-5
lines changed

6 files changed

+153
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `topicoptions.IncludePartitionStats()` for `Topic().Describe()` in order to get partition stats from server
2+
13
## v3.106.1
24
* Dropped `internal/allocator` package and all usages of it for further switch (test) protobuf opaque API
35

internal/grpcwrapper/rawtopic/describe_topic.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ import (
1717
type DescribeTopicRequest struct {
1818
OperationParams rawydb.OperationParams
1919
Path string
20+
IncludeStats bool
2021
}
2122

2223
func (req *DescribeTopicRequest) ToProto() *Ydb_Topic.DescribeTopicRequest {
2324
return &Ydb_Topic.DescribeTopicRequest{
2425
OperationParams: req.OperationParams.ToProto(),
2526
Path: req.Path,
27+
IncludeStats: req.IncludeStats,
2628
}
2729
}
2830

@@ -63,7 +65,10 @@ func (res *DescribeTopicResult) FromProto(response operation.Response) error {
6365
protoPartitions := protoResult.GetPartitions()
6466
res.Partitions = make([]PartitionInfo, len(protoPartitions))
6567
for i, protoPartition := range protoPartitions {
66-
res.Partitions[i].mustFromProto(protoPartition)
68+
err := res.Partitions[i].FromProto(protoPartition)
69+
if err != nil {
70+
return err
71+
}
6772
}
6873

6974
res.RetentionPeriod = protoResult.GetRetentionPeriod().AsDuration()
@@ -93,12 +98,15 @@ type PartitionInfo struct {
9398
Active bool
9499
ChildPartitionIDs []int64
95100
ParentPartitionIDs []int64
101+
PartitionStats PartitionStats
96102
}
97103

98-
func (pi *PartitionInfo) mustFromProto(proto *Ydb_Topic.DescribeTopicResult_PartitionInfo) {
104+
func (pi *PartitionInfo) FromProto(proto *Ydb_Topic.DescribeTopicResult_PartitionInfo) error {
99105
pi.PartitionID = proto.GetPartitionId()
100106
pi.Active = proto.GetActive()
101107

102108
pi.ChildPartitionIDs = clone.Int64Slice(proto.GetChildPartitionIds())
103109
pi.ParentPartitionIDs = clone.Int64Slice(proto.GetParentPartitionIds())
110+
111+
return pi.PartitionStats.FromProto(proto.GetPartitionStats())
104112
}

tests/integration/topic_client_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package integration
55

66
import (
7+
"bytes"
78
"context"
89
"io"
910
"os"
@@ -139,6 +140,95 @@ func TestTopicDescribe(t *testing.T) {
139140
require.Equal(t, expected, topicDesc)
140141
}
141142

143+
func TestTopicDescribePartitionStats(t *testing.T) {
144+
ctx := xtest.Context(t)
145+
db := connect(t)
146+
topicName := "test-topic-" + t.Name()
147+
148+
var (
149+
supportedCodecs = []topictypes.Codec{topictypes.CodecRaw}
150+
minActivePartitions = int64(1)
151+
)
152+
153+
_ = db.Topic().Drop(ctx, topicName)
154+
err := db.Topic().Create(ctx, topicName,
155+
topicoptions.CreateWithSupportedCodecs(supportedCodecs...),
156+
topicoptions.CreateWithMinActivePartitions(minActivePartitions),
157+
)
158+
require.NoError(t, err)
159+
160+
writer, err := db.Topic().StartWriter(topicName, topicoptions.WithWriterWaitServerAck(true))
161+
require.NoError(t, err)
162+
topicMessage := topicwriter.Message{
163+
Data: bytes.NewReader([]byte{128}),
164+
}
165+
err = writer.Write(ctx, topicMessage)
166+
require.NoError(t, err)
167+
168+
topicDesc, err := db.Topic().Describe(ctx, topicName, topicoptions.IncludePartitionStats())
169+
require.NoError(t, err)
170+
171+
expected := topictypes.TopicDescription{
172+
Path: topicName,
173+
PartitionSettings: topictypes.PartitionSettings{
174+
MinActivePartitions: minActivePartitions,
175+
},
176+
Partitions: []topictypes.PartitionInfo{
177+
{
178+
PartitionID: 0,
179+
Active: true,
180+
PartitionStats: topictypes.PartitionStats{
181+
PartitionsOffset: topictypes.OffsetRange{
182+
Start: 0,
183+
End: 1,
184+
},
185+
BytesWritten: topictypes.MultipleWindowsStat{
186+
PerMinute: 1,
187+
PerHour: 1,
188+
PerDay: 1,
189+
},
190+
},
191+
},
192+
},
193+
RetentionPeriod: 86400000000000,
194+
RetentionStorageMB: 0,
195+
SupportedCodecs: supportedCodecs,
196+
PartitionWriteBurstBytes: 1048576,
197+
PartitionWriteSpeedBytesPerSecond: 1048576,
198+
Attributes: nil,
199+
Consumers: []topictypes.Consumer{},
200+
MeteringMode: topictypes.MeteringModeUnspecified,
201+
}
202+
203+
requireAndCleanSubset := func(checked *map[string]string, subset *map[string]string) {
204+
t.Helper()
205+
for k, subValue := range *subset {
206+
checkedValue, ok := (*checked)[k]
207+
require.True(t, ok, k)
208+
require.Equal(t, subValue, checkedValue)
209+
}
210+
*checked = nil
211+
*subset = nil
212+
}
213+
214+
topicDesc.Partitions[0].PartitionStats.LastWriteTime = nil
215+
bw := &topicDesc.Partitions[0].PartitionStats.BytesWritten
216+
sign := func(x int64) int64 {
217+
if x > 0 {
218+
return 1
219+
}
220+
return 0
221+
}
222+
topicDesc.Partitions[0].PartitionStats.BytesWritten = topictypes.MultipleWindowsStat{
223+
PerMinute: sign(bw.PerMinute),
224+
PerHour: sign(bw.PerHour),
225+
PerDay: sign(bw.PerDay),
226+
}
227+
228+
requireAndCleanSubset(&topicDesc.Attributes, &expected.Attributes)
229+
require.Equal(t, expected, topicDesc)
230+
}
231+
142232
func TestDescribeTopicConsumer(t *testing.T) {
143233
ctx := xtest.Context(t)
144234
db := connect(t)

topic/topicoptions/topicoptions_describe.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,21 @@ package topicoptions
22

33
import "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
44

5-
// DescribeOption type for options of describe method. Not used now.
5+
// DescribeOption type for options of describe method.
66
type DescribeOption func(req *rawtopic.DescribeTopicRequest)
77

8+
// IncludePartitionStats additionally fetches DescribeTopicResult.PartitionInfo.PartitionSettings from server
9+
func IncludePartitionStats() DescribeOption {
10+
return func(req *rawtopic.DescribeTopicRequest) {
11+
req.IncludeStats = true
12+
}
13+
}
14+
815
// DescribeConsumerOption type for options of describe consumer method.
916
type DescribeConsumerOption func(req *rawtopic.DescribeConsumerRequest)
1017

18+
// IncludeConsumerStats additionally fetches
19+
// DescribeConsumerResult.DescribeConsumerResultPartitionInfo.PartitionConsumerStats from server
1120
func IncludeConsumerStats() DescribeConsumerOption {
1221
return func(req *rawtopic.DescribeConsumerRequest) {
1322
req.IncludeStats = true

topic/topictypes/topictypes.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ type PartitionInfo struct {
169169
Active bool
170170
ChildPartitionIDs []int64
171171
ParentPartitionIDs []int64
172+
PartitionStats PartitionStats
172173
}
173174

174175
// FromRaw convert from internal format to public. Used internally only.
@@ -178,6 +179,7 @@ func (p *PartitionInfo) FromRaw(raw *rawtopic.PartitionInfo) {
178179

179180
p.ChildPartitionIDs = clone.Int64Slice(raw.ChildPartitionIDs)
180181
p.ParentPartitionIDs = clone.Int64Slice(raw.ParentPartitionIDs)
182+
p.PartitionStats.FromRaw(&raw.PartitionStats)
181183
}
182184

183185
type MultipleWindowsStat struct {

topic/topictypes/topictypes_test.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ import (
1111
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1212
)
1313

14+
var (
15+
fourPM = time.Date(2024, 0o1, 0o1, 16, 0, 0, 0, time.UTC)
16+
hour = time.Hour
17+
)
18+
1419
func TestTopicDescriptionFromRaw(t *testing.T) {
1520
testData := []struct {
1621
testName string
@@ -64,6 +69,20 @@ func TestTopicDescriptionFromRaw(t *testing.T) {
6469
ParentPartitionIDs: []int64{
6570
1, 2, 3,
6671
},
72+
PartitionStats: PartitionStats{
73+
PartitionsOffset: OffsetRange{
74+
Start: 10,
75+
End: 20,
76+
},
77+
StoreSizeBytes: 1024,
78+
LastWriteTime: &fourPM,
79+
MaxWriteTimeLag: &hour,
80+
BytesWritten: MultipleWindowsStat{
81+
PerMinute: 1,
82+
PerHour: 60,
83+
PerDay: 1440,
84+
},
85+
},
6786
},
6887
{
6988
PartitionID: 43,
@@ -104,6 +123,26 @@ func TestTopicDescriptionFromRaw(t *testing.T) {
104123
ParentPartitionIDs: []int64{
105124
1, 2, 3,
106125
},
126+
PartitionStats: rawtopic.PartitionStats{
127+
PartitionsOffset: rawtopiccommon.OffsetRange{
128+
Start: 10,
129+
End: 20,
130+
},
131+
StoreSizeBytes: 1024,
132+
LastWriteTime: rawoptional.Time{
133+
Value: fourPM,
134+
HasValue: true,
135+
},
136+
MaxWriteTimeLag: rawoptional.Duration{
137+
Value: hour,
138+
HasValue: true,
139+
},
140+
BytesWritten: rawtopic.MultipleWindowsStat{
141+
PerMinute: 1,
142+
PerHour: 60,
143+
PerDay: 1440,
144+
},
145+
},
107146
},
108147
{
109148
PartitionID: 43,
@@ -165,8 +204,6 @@ func TestTopicDescriptionFromRaw(t *testing.T) {
165204
}
166205

167206
func TestTopicConsumerDescriptionFromRaw(t *testing.T) {
168-
fourPM := time.Date(2024, 0o1, 0o1, 16, 0, 0, 0, time.UTC)
169-
hour := time.Hour
170207
testData := []struct {
171208
testName string
172209
expectedDescription TopicConsumerDescription

0 commit comments

Comments
 (0)