Skip to content

Commit 5859b3d

Browse files
authored
Introduce user scanner to Ruler/Alertmanager (#6999)
* Add user scanner to Ruler/Alertmanager Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * Move user related files to pkg/util/users Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * changelog Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * delete unused field in am Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * make run user index update loop when sharding disable Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * complete isUserOwned function in am/ruler Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> * fix typo Signed-off-by: SungJin1212 <tjdwls1201@gmail.com> --------- Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 6f8e9e0 commit 5859b3d

File tree

135 files changed

+1231
-716
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

135 files changed

+1231
-716
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [ENHANCEMENT] Ingester: Add `enable_matcher_optimization` config to apply low selectivity matchers lazily. #7063
99
* [ENHANCEMENT] Distributor: Add a label references validation for remote write v2 request. #7074
1010
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
11+
* [ENHANCEMENT] Alertmanager/Ruler: Introduce a user scanner to reduce the number of list calls to object storage. #6999
1112
* [ENHANCEMENT] Ruler: Add DecodingConcurrency config flag for Thanos Engine. #7118
1213
* [ENHANCEMENT] Compactor: Avoid double compaction by cleaning partition files in 2 cycles. #7129
1314
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,6 +1940,11 @@ blocks_storage:
19401940
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
19411941
[max_stale_period: <duration> | default = 1h]
19421942

1943+
# How frequently user index file is updated. It only takes effect when user
1944+
# scan strategy is user_index.
1945+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
1946+
[clean_up_interval: <duration> | default = 15m]
1947+
19431948
# TTL of the cached users. 0 disables caching and relies on caching at
19441949
# bucket client level.
19451950
# CLI flag: -blocks-storage.users-scanner.cache-ttl

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2017,6 +2017,11 @@ blocks_storage:
20172017
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
20182018
[max_stale_period: <duration> | default = 1h]
20192019

2020+
# How frequently user index file is updated. It only takes effect when user
2021+
# scan strategy is user_index.
2022+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
2023+
[clean_up_interval: <duration> | default = 15m]
2024+
20202025
# TTL of the cached users. 0 disables caching and relies on caching at
20212026
# bucket client level.
20222027
# CLI flag: -blocks-storage.users-scanner.cache-ttl

docs/configuration/config-file-reference.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,26 @@ local:
986986
# Path at which alertmanager configurations are stored.
987987
# CLI flag: -alertmanager-storage.local.path
988988
[path: <string> | default = ""]
989+
990+
users_scanner:
991+
# Strategy to use to scan users. Supported values are: list, user_index.
992+
# CLI flag: -alertmanager-storage.users-scanner.strategy
993+
[strategy: <string> | default = "list"]
994+
995+
# Maximum period of time to consider the user index as stale. Fall back to the
996+
# base scanner if stale. Only valid when strategy is user_index.
997+
# CLI flag: -alertmanager-storage.users-scanner.user-index.max-stale-period
998+
[max_stale_period: <duration> | default = 1h]
999+
1000+
# How frequently user index file is updated. It only takes effect when user
1001+
# scan strategy is user_index.
1002+
# CLI flag: -alertmanager-storage.users-scanner.user-index.cleanup-interval
1003+
[clean_up_interval: <duration> | default = 15m]
1004+
1005+
# TTL of the cached users. 0 disables caching and relies on caching at bucket
1006+
# client level.
1007+
# CLI flag: -alertmanager-storage.users-scanner.cache-ttl
1008+
[cache_ttl: <duration> | default = 0s]
9891009
```
9901010
9911011
### `blocks_storage_config`
@@ -2602,6 +2622,11 @@ users_scanner:
26022622
# CLI flag: -blocks-storage.users-scanner.user-index.max-stale-period
26032623
[max_stale_period: <duration> | default = 1h]
26042624

2625+
# How frequently user index file is updated. It only takes effect when user
2626+
# scan strategy is user_index.
2627+
# CLI flag: -blocks-storage.users-scanner.user-index.cleanup-interval
2628+
[clean_up_interval: <duration> | default = 15m]
2629+
26052630
# TTL of the cached users. 0 disables caching and relies on caching at bucket
26062631
# client level.
26072632
# CLI flag: -blocks-storage.users-scanner.cache-ttl
@@ -5832,6 +5857,26 @@ local:
58325857
# Directory to scan for rules
58335858
# CLI flag: -ruler-storage.local.directory
58345859
[directory: <string> | default = ""]
5860+
5861+
users_scanner:
5862+
# Strategy to use to scan users. Supported values are: list, user_index.
5863+
# CLI flag: -ruler-storage.users-scanner.strategy
5864+
[strategy: <string> | default = "list"]
5865+
5866+
# Maximum period of time to consider the user index as stale. Fall back to the
5867+
# base scanner if stale. Only valid when strategy is user_index.
5868+
# CLI flag: -ruler-storage.users-scanner.user-index.max-stale-period
5869+
[max_stale_period: <duration> | default = 1h]
5870+
5871+
# How frequently user index file is updated. It only takes effect when user
5872+
# scan strategy is user_index.
5873+
# CLI flag: -ruler-storage.users-scanner.user-index.cleanup-interval
5874+
[clean_up_interval: <duration> | default = 15m]
5875+
5876+
# TTL of the cached users. 0 disables caching and relies on caching at bucket
5877+
# client level.
5878+
# CLI flag: -ruler-storage.users-scanner.cache-ttl
5879+
[cache_ttl: <duration> | default = 0s]
58355880
```
58365881
58375882
### `runtime_configuration_storage_config`

integration/alertmanager_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,41 @@ func TestAlertmanager(t *testing.T) {
6868
assertServiceMetricsPrefixes(t, AlertManager, alertmanager)
6969
}
7070

71+
func TestAlertmanagerWithUserIndexUpdater(t *testing.T) {
72+
s, err := e2e.NewScenario(networkName)
73+
require.NoError(t, err)
74+
defer s.Close()
75+
76+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs/user-1.yaml", []byte(cortexAlertmanagerUserConfigYaml)))
77+
78+
// Start dependencies.
79+
consul := e2edb.NewConsul()
80+
minio := e2edb.NewMinio(9000, alertsBucketName)
81+
require.NoError(t, s.StartAndWaitReady(consul, minio))
82+
83+
baseFlags := mergeFlags(AlertmanagerFlags(), AlertmanagerS3Flags())
84+
flags := mergeFlags(baseFlags, AlertmanagerShardingFlags(consul.NetworkHTTPEndpoint(), 1), map[string]string{
85+
"-alertmanager-storage.users-scanner.strategy": "user_index",
86+
"-alertmanager-storage.users-scanner.user-index.cleanup-interval": "15s",
87+
"-alertmanager.configs.poll-interval": "5s",
88+
})
89+
90+
am := e2ecortex.NewAlertmanager(
91+
"alertmanager",
92+
flags,
93+
"",
94+
)
95+
96+
require.NoError(t, s.StartAndWaitReady(am))
97+
// To make sure user index file is updated/scanned
98+
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
99+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
100+
)
101+
require.NoError(t, am.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
102+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "alertmanager")),
103+
)
104+
}
105+
71106
func TestAlertmanagerStoreAPI(t *testing.T) {
72107
s, err := e2e.NewScenario(networkName)
73108
require.NoError(t, err)

integration/ruler_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,65 @@ func TestRulerAPI(t *testing.T) {
142142
}
143143
}
144144

145+
func TestRulerWithUserIndexUpdater(t *testing.T) {
146+
s, err := e2e.NewScenario(networkName)
147+
require.NoError(t, err)
148+
defer s.Close()
149+
150+
// Start dependencies.
151+
consul := e2edb.NewConsul()
152+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
153+
require.NoError(t, s.StartAndWaitReady(consul, minio))
154+
155+
// Configure the ruler.
156+
rulerFlags := mergeFlags(
157+
BlocksStorageFlags(),
158+
RulerFlags(),
159+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
160+
map[string]string{
161+
"-ruler.sharding-strategy": "shuffle-sharding",
162+
"-ruler-storage.users-scanner.strategy": "user_index",
163+
"-ruler-storage.users-scanner.user-index.cleanup-interval": "15s",
164+
"-ruler.tenant-shard-size": "1",
165+
// Since we're not going to run any rule, we don't need the
166+
// store-gateway to be configured to a valid address.
167+
"-querier.store-gateway-addresses": "localhost:12345",
168+
// Enable the bucket index so we can skip the initial bucket scan.
169+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
170+
"-ruler.poll-interval": "2s",
171+
"-log.level": "info",
172+
},
173+
)
174+
175+
ruler := e2ecortex.NewRuler(
176+
"ruler",
177+
consul.NetworkHTTPEndpoint(),
178+
rulerFlags,
179+
"",
180+
)
181+
182+
require.NoError(t, s.StartAndWaitReady(ruler))
183+
184+
// Create a client with the ruler address configured
185+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), "user-1")
186+
require.NoError(t, err)
187+
188+
ruleGroup := createTestRuleGroup(t)
189+
ns := "ns"
190+
191+
// Set the rule group into the ruler
192+
require.NoError(t, c.SetRuleGroup(ruleGroup, ns))
193+
194+
// To make sure user index file is updated/scanned
195+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Greater(float64(0)), []string{"cortex_user_index_last_successful_update_timestamp_seconds"}),
196+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "ruler")),
197+
)
198+
199+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(float64(1)), []string{"cortex_user_index_scan_succeeded_total"}),
200+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "ruler")),
201+
)
202+
}
203+
145204
func TestRulerAPISingleBinary(t *testing.T) {
146205
s, err := e2e.NewScenario(networkName)
147206
require.NoError(t, err)

pkg/alertmanager/alertstore/bucketclient/bucket_client.go

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@ package bucketclient
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"io"
78
"strings"
89
"sync"
910

1011
"github.com/go-kit/log"
1112
"github.com/gogo/protobuf/proto"
1213
"github.com/pkg/errors"
14+
"github.com/prometheus/client_golang/prometheus"
1315
"github.com/thanos-io/objstore"
14-
15-
"github.com/cortexproject/cortex/pkg/storage/tsdb"
16+
"github.com/thanos-io/thanos/pkg/extprom"
1617

1718
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
1819
"github.com/cortexproject/cortex/pkg/storage/bucket"
1920
"github.com/cortexproject/cortex/pkg/util/concurrency"
2021
"github.com/cortexproject/cortex/pkg/util/runutil"
22+
"github.com/cortexproject/cortex/pkg/util/users"
2123
)
2224

2325
const (
@@ -45,27 +47,54 @@ type BucketAlertStore struct {
4547
amBucket objstore.Bucket
4648
cfgProvider bucket.TenantConfigProvider
4749
logger log.Logger
50+
51+
usersScanner users.Scanner
52+
userIndexUpdater *users.UserIndexUpdater
4853
}
4954

50-
func NewBucketAlertStore(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *BucketAlertStore {
51-
return &BucketAlertStore{
52-
alertsBucket: bucket.NewPrefixedBucketClient(bkt, alertsPrefix),
53-
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
54-
cfgProvider: cfgProvider,
55-
logger: logger,
55+
func NewBucketAlertStore(bkt objstore.InstrumentedBucket, userScannerCfg users.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketAlertStore, error) {
56+
alertBucket := bucket.NewPrefixedBucketClient(bkt, alertsPrefix)
57+
58+
regWithComponent := extprom.WrapRegistererWith(prometheus.Labels{"component": "alertmanager"}, reg)
59+
usersScanner, err := users.NewScanner(userScannerCfg, alertBucket, logger, regWithComponent)
60+
if err != nil {
61+
return nil, errors.Wrap(err, "unable to initialize alertmanager users scanner")
5662
}
63+
64+
var userIndexUpdater *users.UserIndexUpdater
65+
if userScannerCfg.Strategy == users.UserScanStrategyUserIndex {
66+
// We hardcode strategy to be list so can ignore error.
67+
baseScanner, _ := users.NewScanner(users.UsersScannerConfig{
68+
Strategy: users.UserScanStrategyList,
69+
}, alertBucket, logger, regWithComponent)
70+
userIndexUpdater = users.NewUserIndexUpdater(alertBucket, userScannerCfg.CleanUpInterval, baseScanner, regWithComponent)
71+
}
72+
73+
return &BucketAlertStore{
74+
alertsBucket: alertBucket,
75+
amBucket: bucket.NewPrefixedBucketClient(bkt, alertmanagerPrefix),
76+
cfgProvider: cfgProvider,
77+
logger: logger,
78+
usersScanner: usersScanner,
79+
userIndexUpdater: userIndexUpdater,
80+
}, nil
81+
}
82+
83+
// GetUserIndexUpdater implements alertstore.AlertStore.
84+
func (s *BucketAlertStore) GetUserIndexUpdater() *users.UserIndexUpdater {
85+
return s.userIndexUpdater
5786
}
5887

5988
// ListAllUsers implements alertstore.AlertStore.
6089
func (s *BucketAlertStore) ListAllUsers(ctx context.Context) ([]string, error) {
61-
var userIDs []string
62-
63-
err := s.alertsBucket.Iter(ctx, "", func(key string) error {
64-
userIDs = append(userIDs, key)
65-
return nil
66-
})
67-
68-
return userIDs, err
90+
active, deleting, _, err := s.usersScanner.ScanUsers(ctx)
91+
if err != nil {
92+
return nil, fmt.Errorf("unable to list users in alertmanager store bucket: %w", err)
93+
}
94+
userIDs := make([]string, 0, len(active)+len(deleting))
95+
userIDs = append(userIDs, active...)
96+
userIDs = append(userIDs, deleting...)
97+
return userIDs, nil
6998
}
7099

71100
// GetAlertConfigs implements alertstore.AlertStore.
@@ -217,5 +246,5 @@ func (s *BucketAlertStore) getUserBucket(userID string) objstore.Bucket {
217246

218247
func (s *BucketAlertStore) getAlertmanagerUserBucket(userID string) objstore.Bucket {
219248
uBucket := bucket.NewUserBucketClient(userID, s.amBucket, s.cfgProvider)
220-
return uBucket.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(uBucket.IsAccessDeniedErr, uBucket.IsObjNotFoundErr))
249+
return uBucket.WithExpectedErrs(bucket.IsOneOfTheExpectedErrors(uBucket.IsAccessDeniedErr, uBucket.IsObjNotFoundErr))
221250
}

pkg/alertmanager/alertstore/config.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ import (
88
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/local"
99
"github.com/cortexproject/cortex/pkg/configs/client"
1010
"github.com/cortexproject/cortex/pkg/storage/bucket"
11+
"github.com/cortexproject/cortex/pkg/util/users"
1112
)
1213

13-
// Config configures a the alertmanager storage backend.
14+
// Config configures the alertmanager storage backend.
1415
type Config struct {
1516
bucket.Config `yaml:",inline"`
16-
ConfigDB client.Config `yaml:"configdb"`
17-
Local local.StoreConfig `yaml:"local"`
17+
ConfigDB client.Config `yaml:"configdb"`
18+
Local local.StoreConfig `yaml:"local"`
19+
UsersScanner users.UsersScannerConfig `yaml:"users_scanner"`
1820
}
1921

2022
// RegisterFlags registers the backend storage config.
@@ -25,6 +27,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2527
cfg.ConfigDB.RegisterFlagsWithPrefix(prefix, f)
2628
cfg.Local.RegisterFlagsWithPrefix(prefix, f)
2729
cfg.RegisterFlagsWithPrefix(prefix, f)
30+
cfg.UsersScanner.RegisterFlagsWithPrefix(prefix, f)
2831
}
2932

3033
// IsFullStateSupported returns if the given configuration supports access to FullState objects.

pkg/alertmanager/alertstore/configdb/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
88
"github.com/cortexproject/cortex/pkg/configs/client"
99
"github.com/cortexproject/cortex/pkg/configs/userconfig"
10+
"github.com/cortexproject/cortex/pkg/util/users"
1011
)
1112

1213
const (
@@ -34,6 +35,11 @@ func NewStore(c client.Client) *Store {
3435
}
3536
}
3637

38+
// GetUserIndexUpdater implements alertstore.AlertStore.
39+
func (c *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
40+
return nil
41+
}
42+
3743
// ListAllUsers implements alertstore.AlertStore.
3844
func (c *Store) ListAllUsers(ctx context.Context) ([]string, error) {
3945
configs, err := c.reloadConfigs(ctx)

pkg/alertmanager/alertstore/local/store.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/prometheus/alertmanager/config"
1212

1313
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
14+
"github.com/cortexproject/cortex/pkg/util/users"
1415
)
1516

1617
const (
@@ -43,6 +44,11 @@ func NewStore(cfg StoreConfig) (*Store, error) {
4344
return &Store{cfg}, nil
4445
}
4546

47+
// GetUserIndexUpdater implements alertstore.AlertStore.
48+
func (f *Store) GetUserIndexUpdater() *users.UserIndexUpdater {
49+
return nil
50+
}
51+
4652
// ListAllUsers implements alertstore.AlertStore.
4753
func (f *Store) ListAllUsers(_ context.Context) ([]string, error) {
4854
configs, err := f.reloadConfigs()

0 commit comments

Comments
 (0)