Skip to content

Commit d3fabff

Browse files
committed
Add slowset utility from external-resizer
1 parent f77445f commit d3fabff

File tree

2 files changed

+236
-0
lines changed

2 files changed

+236
-0
lines changed

slowset/slowset.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package slowset
18+
19+
import (
20+
"sync"
21+
"time"
22+
)
23+
24+
// SlowSet is a set of API objects that should be synced at slower rate. Key is typically the object
25+
// namespace + name and value is timestamp when the object was added to the set.
26+
type SlowSet struct {
27+
sync.RWMutex
28+
// retentionTime is the time after which an item will be removed from the set
29+
// this indicates, how long before an operation on pvc can be retried.
30+
retentionTime time.Duration
31+
32+
resyncPeriod time.Duration
33+
workSet map[string]ObjectData
34+
}
35+
36+
type ObjectData struct {
37+
Timestamp time.Time
38+
StorageClassUID string
39+
}
40+
41+
func NewSlowSet(retTime time.Duration) *SlowSet {
42+
return &SlowSet{
43+
retentionTime: retTime,
44+
resyncPeriod: 100 * time.Millisecond,
45+
workSet: make(map[string]ObjectData),
46+
}
47+
}
48+
49+
func (s *SlowSet) Add(key string, info ObjectData) bool {
50+
s.Lock()
51+
defer s.Unlock()
52+
53+
if _, ok := s.workSet[key]; ok {
54+
return false
55+
}
56+
57+
s.workSet[key] = info
58+
return true
59+
}
60+
61+
func (s *SlowSet) Get(key string) (ObjectData, bool) {
62+
s.RLock()
63+
defer s.RUnlock()
64+
65+
info, ok := s.workSet[key]
66+
return info, ok
67+
}
68+
69+
func (s *SlowSet) Contains(key string) bool {
70+
s.RLock()
71+
defer s.RUnlock()
72+
73+
info, ok := s.workSet[key]
74+
if ok && time.Since(info.Timestamp) < s.retentionTime {
75+
return true
76+
}
77+
return false
78+
}
79+
80+
func (s *SlowSet) Remove(key string) {
81+
s.Lock()
82+
defer s.Unlock()
83+
84+
delete(s.workSet, key)
85+
}
86+
87+
func (s *SlowSet) TimeRemaining(key string) time.Duration {
88+
s.RLock()
89+
defer s.RUnlock()
90+
91+
if info, ok := s.workSet[key]; ok {
92+
return s.retentionTime - time.Since(info.Timestamp)
93+
}
94+
return 0
95+
}
96+
97+
func (s *SlowSet) removeAllExpired() {
98+
s.Lock()
99+
defer s.Unlock()
100+
for key, info := range s.workSet {
101+
if time.Since(info.Timestamp) > s.retentionTime {
102+
delete(s.workSet, key)
103+
}
104+
}
105+
}
106+
107+
func (s *SlowSet) Run(stopCh <-chan struct{}) {
108+
ticker := time.NewTicker(s.resyncPeriod)
109+
defer ticker.Stop()
110+
for {
111+
select {
112+
case <-stopCh:
113+
return
114+
case <-ticker.C:
115+
s.removeAllExpired()
116+
}
117+
}
118+
}

slowset/slowset_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package slowset
18+
19+
import (
20+
"testing"
21+
"time"
22+
)
23+
24+
func TestSlowSet(t *testing.T) {
25+
tests := []struct {
26+
name string
27+
retentionTime time.Duration
28+
resyncPeriod time.Duration
29+
testFunc func(*SlowSet) bool
30+
}{
31+
{
32+
name: "Should not change time of a key if added multiple times",
33+
resyncPeriod: 100 * time.Millisecond,
34+
testFunc: func(s *SlowSet) bool {
35+
key := "key"
36+
info := ObjectData{
37+
Timestamp: time.Now(),
38+
}
39+
s.Add(key, info)
40+
time1 := s.workSet[key]
41+
s.Add(key, info)
42+
time2 := s.workSet[key]
43+
return time1 == time2
44+
},
45+
},
46+
{
47+
name: "Should remove key after retention time",
48+
retentionTime: 200 * time.Millisecond,
49+
resyncPeriod: 100 * time.Millisecond,
50+
testFunc: func(s *SlowSet) bool {
51+
key := "key"
52+
info := ObjectData{
53+
Timestamp: time.Now(),
54+
}
55+
s.Add(key, info)
56+
time.Sleep(300 * time.Millisecond)
57+
return !s.Contains(key)
58+
},
59+
},
60+
{
61+
name: "Should not remove key before retention time",
62+
retentionTime: 200 * time.Millisecond,
63+
resyncPeriod: 100 * time.Millisecond,
64+
testFunc: func(s *SlowSet) bool {
65+
key := "key"
66+
info := ObjectData{
67+
Timestamp: time.Now(),
68+
}
69+
s.Add(key, info)
70+
time.Sleep(100 * time.Millisecond)
71+
return s.Contains(key)
72+
},
73+
},
74+
{
75+
name: "Should return time remaining for added keys",
76+
retentionTime: 300 * time.Millisecond,
77+
resyncPeriod: 100 * time.Millisecond,
78+
testFunc: func(s *SlowSet) bool {
79+
key := "key"
80+
info := ObjectData{
81+
Timestamp: time.Now(),
82+
}
83+
s.Add(key, info)
84+
time.Sleep(100 * time.Millisecond)
85+
timeRemaining := s.TimeRemaining(key)
86+
return timeRemaining > 0 && timeRemaining < 300*time.Millisecond
87+
},
88+
},
89+
{
90+
name: "should return false for Contains if key is present but expired",
91+
resyncPeriod: 200 * time.Millisecond,
92+
retentionTime: 300 * time.Millisecond,
93+
testFunc: func(s *SlowSet) bool {
94+
key := "key"
95+
info := ObjectData{
96+
Timestamp: time.Now(),
97+
}
98+
s.Add(key, info)
99+
time.Sleep(301 * time.Millisecond)
100+
return !s.Contains(key)
101+
},
102+
},
103+
}
104+
105+
for i := range tests {
106+
test := tests[i]
107+
t.Run(test.name, func(t *testing.T) {
108+
s := NewSlowSet(test.retentionTime)
109+
s.resyncPeriod = test.resyncPeriod
110+
stopCh := make(chan struct{}, 1)
111+
go s.Run(stopCh)
112+
defer close(stopCh)
113+
if !test.testFunc(s) {
114+
t.Errorf("Test failed")
115+
}
116+
})
117+
}
118+
}

0 commit comments

Comments
 (0)