Skip to content

Commit 114dd9f

Browse files
authored
Introduce additional metastore compaction metrics (#4625)
* Add a few more metastore compaction metrics * Update metric description * Improve misconfiguration handling, variable names * Rework stats collection approach, to avoid data races * Bring back level label * Fix backlog jobs calculation * Calculate backlog jobs from batches
1 parent c5389c7 commit 114dd9f

File tree

4 files changed

+204
-22
lines changed

4 files changed

+204
-22
lines changed

pkg/metastore/compaction/compactor/compaction_queue.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ type compactionKey struct {
2323
}
2424

2525
type compactionQueue struct {
26-
config Config
27-
registerer prometheus.Registerer
28-
levels []*blockQueue
26+
config Config
27+
registerer prometheus.Registerer
28+
levels []*blockQueue
29+
globalStats *globalQueueStats
30+
statsCollector *globalQueueStatsCollector
2931
}
3032

3133
// blockQueue stages blocks as they are being added. Once a batch of blocks
@@ -40,9 +42,10 @@ type compactionQueue struct {
4042
// the queue is through explicit removal. Batch and block iterators provide
4143
// the read access.
4244
type blockQueue struct {
43-
config Config
44-
registerer prometheus.Registerer
45-
staged map[compactionKey]*stagedBlocks
45+
config Config
46+
registerer prometheus.Registerer
47+
staged map[compactionKey]*stagedBlocks
48+
globalStats *globalQueueStats
4649
// Batches ordered by arrival.
4750
head, tail *batch
4851
// Priority queue by last update: we need to flush
@@ -103,10 +106,17 @@ type batch struct {
103106
}
104107

105108
func newCompactionQueue(config Config, registerer prometheus.Registerer) *compactionQueue {
106-
return &compactionQueue{
107-
config: config,
108-
registerer: registerer,
109+
globalStats := newGlobalQueueStats(len(config.Levels))
110+
q := &compactionQueue{
111+
config: config,
112+
registerer: registerer,
113+
globalStats: globalStats,
109114
}
115+
if registerer != nil {
116+
q.statsCollector = newGlobalQueueStatsCollector(q)
117+
util.RegisterOrGet(registerer, q.statsCollector)
118+
}
119+
return q
110120
}
111121

112122
func (q *compactionQueue) reset() {
@@ -145,18 +155,19 @@ func (q *compactionQueue) blockQueue(l uint32) *blockQueue {
145155
}
146156
level := q.levels[l]
147157
if level == nil {
148-
level = newBlockQueue(q.config, q.registerer)
158+
level = newBlockQueue(q.config, q.registerer, q.globalStats)
149159
q.levels[l] = level
150160
}
151161
return level
152162
}
153163

154-
func newBlockQueue(config Config, registerer prometheus.Registerer) *blockQueue {
164+
func newBlockQueue(config Config, registerer prometheus.Registerer, globalStats *globalQueueStats) *blockQueue {
155165
return &blockQueue{
156-
config: config,
157-
registerer: registerer,
158-
staged: make(map[compactionKey]*stagedBlocks),
159-
updates: new(priorityBlockQueue),
166+
config: config,
167+
registerer: registerer,
168+
staged: make(map[compactionKey]*stagedBlocks),
169+
globalStats: globalStats,
170+
updates: new(priorityBlockQueue),
160171
}
161172
}
162173

@@ -172,6 +183,7 @@ func (q *blockQueue) stagedBlocks(k compactionKey) *stagedBlocks {
172183
staged.resetBatch()
173184
q.staged[k] = staged
174185
heap.Push(q.updates, staged)
186+
q.globalStats.AddQueues(k, 1)
175187
if q.registerer != nil {
176188
staged.collector = newQueueStatsCollector(staged)
177189
util.RegisterOrGet(q.registerer, staged.collector)
@@ -189,6 +201,7 @@ func (q *blockQueue) removeStaged(s *stagedBlocks) {
189201
panic("bug: attempt to delete compaction queue with an invalid priority index")
190202
}
191203
heap.Remove(q.updates, s.heapIndex)
204+
q.globalStats.AddQueues(s.key, -1)
192205
}
193206

194207
func (s *stagedBlocks) push(block blockEntry) bool {
@@ -203,6 +216,7 @@ func (s *stagedBlocks) push(block blockEntry) bool {
203216
}
204217
s.batch.size++
205218
s.stats.blocks.Add(1)
219+
s.queue.globalStats.AddBlocks(s.key, 1)
206220
if s.queue.config.exceedsMaxSize(s.batch) ||
207221
s.queue.config.exceedsMaxAge(s.batch, s.updatedAt) {
208222
s.flush()
@@ -245,6 +259,7 @@ func (s *stagedBlocks) delete(block string) blockEntry {
245259
ref.batch.blocks[ref.index] = zeroBlockEntry
246260
ref.batch.size--
247261
s.stats.blocks.Add(-1)
262+
s.queue.globalStats.AddBlocks(s.key, -1)
248263
if ref.batch.size == 0 {
249264
if ref.batch != s.batch {
250265
// We should never ever try to delete the staging batch from the
@@ -311,6 +326,7 @@ func (q *blockQueue) pushBatch(b *batch) bool {
311326
b.staged.tail = b
312327

313328
b.staged.stats.batches.Add(1)
329+
q.globalStats.AddBatches(b.staged.key, 1)
314330
return true
315331
}
316332

@@ -357,6 +373,7 @@ func (q *blockQueue) removeBatch(b *batch) bool {
357373
b.prev = nil
358374

359375
b.staged.stats.batches.Add(-1)
376+
q.globalStats.AddBatches(b.staged.key, -1)
360377
return true
361378
}
362379

pkg/metastore/compaction/compactor/compaction_queue_test.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,13 @@ import (
1414

1515
func testBlockEntry(id int) blockEntry { return blockEntry{id: strconv.Itoa(id)} }
1616

17+
func testBlockQueue(cfg Config) *blockQueue {
18+
stats := newGlobalQueueStats(len(cfg.Levels))
19+
return newBlockQueue(cfg, nil, stats)
20+
}
21+
1722
func TestBlockQueue_Push(t *testing.T) {
18-
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
23+
q := testBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}})
1924
key := compactionKey{tenant: "t", shard: 1}
2025

2126
result := q.stagedBlocks(key).push(testBlockEntry(1))
@@ -46,7 +51,7 @@ func TestBlockQueue_Push(t *testing.T) {
4651
}
4752

4853
func TestBlockQueue_DuplicateBlock(t *testing.T) {
49-
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
54+
q := testBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}})
5055
key := compactionKey{tenant: "t", shard: 1}
5156

5257
require.True(t, q.stagedBlocks(key).push(testBlockEntry(1)))
@@ -56,7 +61,7 @@ func TestBlockQueue_DuplicateBlock(t *testing.T) {
5661
}
5762

5863
func TestBlockQueue_Remove(t *testing.T) {
59-
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
64+
q := testBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}})
6065
key := compactionKey{tenant: "t", shard: 1}
6166
q.stagedBlocks(key).push(testBlockEntry(1))
6267
q.stagedBlocks(key).push(testBlockEntry(2))
@@ -73,7 +78,7 @@ func TestBlockQueue_Remove(t *testing.T) {
7378
}
7479

7580
func TestBlockQueue_RemoveNotFound(t *testing.T) {
76-
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
81+
q := testBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}})
7782
key := compactionKey{tenant: "t", shard: 1}
7883
remove(q, key, "1")
7984
q.stagedBlocks(key).push(testBlockEntry(1))
@@ -85,7 +90,7 @@ func TestBlockQueue_RemoveNotFound(t *testing.T) {
8590
}
8691

8792
func TestBlockQueue_Linking(t *testing.T) {
88-
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 2}}}, nil)
93+
q := testBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 2}}})
8994
key := compactionKey{tenant: "t", shard: 1}
9095

9196
q.stagedBlocks(key).push(testBlockEntry(1))
@@ -154,7 +159,7 @@ func TestBlockQueue_EmptyQueue(t *testing.T) {
154159
numBlocksPerKey = 100
155160
)
156161

157-
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
162+
q := testBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}})
158163
keys := make([]compactionKey, numKeys)
159164
for i := 0; i < numKeys; i++ {
160165
keys[i] = compactionKey{
@@ -254,7 +259,7 @@ func TestBlockQueue_FlushByAge(t *testing.T) {
254259
}
255260

256261
func TestBlockQueue_BatchIterator(t *testing.T) {
257-
q := newBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}}, nil)
262+
q := testBlockQueue(Config{Levels: []LevelConfig{{MaxBlocks: 3}}})
258263
keys := []compactionKey{
259264
{tenant: "t-1", shard: 1},
260265
{tenant: "t-2", shard: 2},

pkg/metastore/compaction/compactor/metrics.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package compactor
22

33
import (
44
"strconv"
5+
"sync/atomic"
56

67
"github.com/prometheus/client_golang/prometheus"
78
)
@@ -66,3 +67,85 @@ func (b *queueStatsCollector) Collect(m chan<- prometheus.Metric) {
6667
m <- prometheus.MustNewConstMetric(b.rejected, prometheus.CounterValue, float64(b.stats.rejected.Load()))
6768
m <- prometheus.MustNewConstMetric(b.missed, prometheus.CounterValue, float64(b.stats.missed.Load()))
6869
}
70+
71+
type globalQueueStats struct {
72+
blocksPerLevel []atomic.Int32
73+
queuesPerLevel []atomic.Int32
74+
batchesPerLevel []atomic.Int32
75+
}
76+
77+
func newGlobalQueueStats(numLevels int) *globalQueueStats {
78+
return &globalQueueStats{
79+
blocksPerLevel: make([]atomic.Int32, numLevels),
80+
queuesPerLevel: make([]atomic.Int32, numLevels),
81+
batchesPerLevel: make([]atomic.Int32, numLevels),
82+
}
83+
}
84+
85+
func (g *globalQueueStats) AddBlocks(key compactionKey, delta int32) {
86+
g.blocksPerLevel[key.level].Add(delta)
87+
}
88+
89+
func (g *globalQueueStats) AddQueues(key compactionKey, delta int32) {
90+
g.queuesPerLevel[key.level].Add(delta)
91+
}
92+
93+
func (g *globalQueueStats) AddBatches(key compactionKey, delta int32) {
94+
g.batchesPerLevel[key.level].Add(delta)
95+
}
96+
97+
type globalQueueStatsCollector struct {
98+
compactionQueue *compactionQueue
99+
100+
blocks *prometheus.Desc
101+
queues *prometheus.Desc
102+
batches *prometheus.Desc
103+
}
104+
105+
const globalQueueMetricsPrefix = "compaction_global_queue_"
106+
107+
func newGlobalQueueStatsCollector(compactionQueue *compactionQueue) *globalQueueStatsCollector {
108+
variableLabels := []string{"level"}
109+
110+
return &globalQueueStatsCollector{
111+
compactionQueue: compactionQueue,
112+
113+
blocks: prometheus.NewDesc(
114+
globalQueueMetricsPrefix+"blocks_current",
115+
"The current number of blocks across all queues, for a compaction level.",
116+
variableLabels, nil,
117+
),
118+
119+
queues: prometheus.NewDesc(
120+
globalQueueMetricsPrefix+"queues_current",
121+
"The current number of queues, for a compaction level.",
122+
variableLabels, nil,
123+
),
124+
125+
batches: prometheus.NewDesc(
126+
globalQueueMetricsPrefix+"batches_current",
127+
"The current number of batches (jobs that are not yet created), for a compaction level.",
128+
variableLabels, nil,
129+
),
130+
}
131+
}
132+
133+
func (c *globalQueueStatsCollector) Describe(ch chan<- *prometheus.Desc) {
134+
ch <- c.blocks
135+
ch <- c.queues
136+
ch <- c.batches
137+
}
138+
139+
func (c *globalQueueStatsCollector) Collect(ch chan<- prometheus.Metric) {
140+
for levelIdx := range c.compactionQueue.config.Levels {
141+
blocksAtLevel := c.compactionQueue.globalStats.blocksPerLevel[levelIdx].Load()
142+
queuesAtLevel := c.compactionQueue.globalStats.queuesPerLevel[levelIdx].Load()
143+
batchesAtLevel := c.compactionQueue.globalStats.batchesPerLevel[levelIdx].Load()
144+
145+
levelLabel := strconv.Itoa(levelIdx)
146+
147+
ch <- prometheus.MustNewConstMetric(c.blocks, prometheus.GaugeValue, float64(blocksAtLevel), levelLabel)
148+
ch <- prometheus.MustNewConstMetric(c.queues, prometheus.GaugeValue, float64(queuesAtLevel), levelLabel)
149+
ch <- prometheus.MustNewConstMetric(c.batches, prometheus.GaugeValue, float64(batchesAtLevel), levelLabel)
150+
}
151+
}

pkg/metastore/compaction/compactor/metrics_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,80 @@ func TestCollectorRegistration(t *testing.T) {
2828
}
2929
}
3030
}
31+
32+
func TestBlockQueueAggregatedMetrics(t *testing.T) {
33+
reg := prometheus.NewRegistry()
34+
c := NewCompactor(testConfig, nil, nil, reg)
35+
36+
entries := []compaction.BlockEntry{
37+
{ID: "block1", Tenant: "A", Shard: 0, Level: 0},
38+
{ID: "block2", Tenant: "A", Shard: 0, Level: 0},
39+
{ID: "block3", Tenant: "A", Shard: 0, Level: 0},
40+
{ID: "block4", Tenant: "A", Shard: 1, Level: 0},
41+
{ID: "block5", Tenant: "B", Shard: 0, Level: 1},
42+
{ID: "block6", Tenant: "B", Shard: 0, Level: 1},
43+
{ID: "block7", Tenant: "B", Shard: 0, Level: 1},
44+
}
45+
46+
for _, e := range entries {
47+
c.enqueue(e)
48+
}
49+
50+
metrics, err := reg.Gather()
51+
if err != nil {
52+
t.Fatalf("failed to gather metrics: %v", err)
53+
}
54+
55+
var blocksTotal, queuesTotal, batchesTotal float64
56+
var foundBlocks, foundQueues, foundBatches bool
57+
58+
for _, mf := range metrics {
59+
if mf.GetName() == "compaction_global_queue_blocks_current" {
60+
for _, m := range mf.GetMetric() {
61+
blocksTotal += m.GetGauge().GetValue()
62+
foundBlocks = true
63+
}
64+
}
65+
if mf.GetName() == "compaction_global_queue_queues_current" {
66+
for _, m := range mf.GetMetric() {
67+
queuesTotal += m.GetGauge().GetValue()
68+
foundQueues = true
69+
}
70+
}
71+
72+
if mf.GetName() == "compaction_global_queue_batches_current" {
73+
for _, m := range mf.GetMetric() {
74+
batchesTotal += m.GetGauge().GetValue()
75+
foundBatches = true
76+
}
77+
}
78+
}
79+
80+
if !foundBlocks {
81+
t.Fatal("compaction_global_queue_blocks metric not found")
82+
}
83+
if !foundQueues {
84+
t.Fatal("compaction_global_queue_queues metric not found")
85+
}
86+
if !foundBatches {
87+
t.Fatal("compaction_global_queue_batches_current metric not found")
88+
}
89+
90+
if blocksTotal != 7 {
91+
t.Errorf("expected 7 total blocks, got %v", blocksTotal)
92+
}
93+
94+
if queuesTotal != 3 {
95+
t.Errorf("expected 3 total queues, got %v", queuesTotal)
96+
}
97+
98+
// testConfig.Levels[0].MaxBlocks = 3
99+
// testConfig.Levels[1].MaxBlocks = 2
100+
// (A,0): 3 blocks → 3/3 = 1 batch
101+
// (A,1): 1 block → 1/2 = 0 batches
102+
// (B,1): 3 blocks → 3/2 = 1 batch
103+
// Total = 2 batches
104+
if batchesTotal != 2 {
105+
t.Errorf("expected 2 total batches, got %v", batchesTotal)
106+
}
107+
}

0 commit comments

Comments
 (0)