Skip to content

Commit b706778

Browse files
authored
feat(kubernetes): add cluster state monitoring with debounced reload (#502)
Implement automatic detection of cluster state changes (API groups, OpenShift status) with configurable polling and debounce windows. The cluster state watcher runs in the background, invalidates the discovery cache periodically, and triggers a reload callback when changes are detected after a debounce period. - Add clusterStateWatcher to monitor API groups and OpenShift status - Implement debounced reload to avoid excessive reloads during changes - Add WatchClusterState method to Manager with configurable intervals - Integrate cluster state watching in kubeconfig and single cluster providers Signed-off-by: Nader Ziada <nziada@redhat.com>
1 parent bb17628 commit b706778

File tree

5 files changed

+216
-12
lines changed

5 files changed

+216
-12
lines changed

pkg/kubernetes/manager.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"errors"
66
"fmt"
77
"os"
8+
"sort"
89
"strconv"
910
"strings"
11+
"sync"
12+
"time"
1013

1114
"github.com/containers/kubernetes-mcp-server/pkg/config"
1215
"github.com/fsnotify/fsnotify"
@@ -23,10 +26,38 @@ type Manager struct {
2326

2427
staticConfig *config.StaticConfig
2528
CloseWatchKubeConfig CloseWatchKubeConfig
29+
30+
clusterWatcher *clusterStateWatcher
31+
}
32+
33+
// clusterState represents the cached state of the cluster
34+
type clusterState struct {
35+
apiGroups []string
36+
isOpenShift bool
37+
}
38+
39+
// clusterStateWatcher monitors cluster state changes and triggers debounced reloads
40+
type clusterStateWatcher struct {
41+
manager *Manager
42+
pollInterval time.Duration
43+
debounceWindow time.Duration
44+
lastKnownState clusterState
45+
reloadCallback func() error
46+
debounceTimer *time.Timer
47+
mu sync.Mutex
48+
stopCh chan struct{}
49+
stoppedCh chan struct{}
2650
}
2751

2852
var _ Openshift = (*Manager)(nil)
2953

54+
const (
55+
// DefaultClusterStatePollInterval is the default interval for polling cluster state changes
56+
DefaultClusterStatePollInterval = 30 * time.Second
57+
// DefaultClusterStateDebounceWindow is the default debounce window for cluster state changes
58+
DefaultClusterStateDebounceWindow = 5 * time.Second
59+
)
60+
3061
var (
3162
ErrorKubeconfigInClusterNotAllowed = errors.New("kubeconfig manager cannot be used in in-cluster deployments")
3263
ErrorInClusterNotInCluster = errors.New("in-cluster manager cannot be used outside of a cluster")
@@ -154,6 +185,9 @@ func (m *Manager) Close() {
154185
if m.CloseWatchKubeConfig != nil {
155186
_ = m.CloseWatchKubeConfig()
156187
}
188+
if m.clusterWatcher != nil {
189+
m.clusterWatcher.stop()
190+
}
157191
}
158192

159193
func (m *Manager) VerifyToken(ctx context.Context, token, audience string) (*authenticationv1api.UserInfo, []string, error) {
@@ -249,3 +283,117 @@ func applyRateLimitFromEnv(cfg *rest.Config) {
249283
}
250284
}
251285
}
286+
287+
// WatchClusterState starts a background watcher that periodically polls for cluster state changes
288+
// and triggers a debounced reload when changes are detected.
289+
func (m *Manager) WatchClusterState(pollInterval, debounceWindow time.Duration, onClusterStateChange func() error) {
290+
if m.clusterWatcher != nil {
291+
m.clusterWatcher.stop()
292+
}
293+
294+
watcher := &clusterStateWatcher{
295+
manager: m,
296+
pollInterval: pollInterval,
297+
debounceWindow: debounceWindow,
298+
reloadCallback: onClusterStateChange,
299+
stopCh: make(chan struct{}),
300+
stoppedCh: make(chan struct{}),
301+
}
302+
303+
captureState := func() clusterState {
304+
state := clusterState{apiGroups: []string{}}
305+
if groups, err := m.accessControlClientset.DiscoveryClient().ServerGroups(); err == nil {
306+
for _, group := range groups.Groups {
307+
state.apiGroups = append(state.apiGroups, group.Name)
308+
}
309+
sort.Strings(state.apiGroups)
310+
}
311+
state.isOpenShift = m.IsOpenShift(context.Background())
312+
return state
313+
}
314+
watcher.lastKnownState = captureState()
315+
316+
m.clusterWatcher = watcher
317+
318+
// Start background monitoring
319+
go func() {
320+
defer close(watcher.stoppedCh)
321+
ticker := time.NewTicker(pollInterval)
322+
defer ticker.Stop()
323+
324+
klog.V(2).Infof("Started cluster state watcher (poll interval: %v, debounce: %v)", pollInterval, debounceWindow)
325+
326+
for {
327+
select {
328+
case <-watcher.stopCh:
329+
klog.V(2).Info("Stopping cluster state watcher")
330+
return
331+
case <-ticker.C:
332+
// Invalidate discovery cache to get fresh API groups
333+
m.accessControlClientset.DiscoveryClient().Invalidate()
334+
335+
watcher.mu.Lock()
336+
current := captureState()
337+
klog.V(3).Infof("Polled cluster state: %d API groups, OpenShift=%v", len(current.apiGroups), current.isOpenShift)
338+
339+
changed := current.isOpenShift != watcher.lastKnownState.isOpenShift ||
340+
len(current.apiGroups) != len(watcher.lastKnownState.apiGroups)
341+
342+
if !changed {
343+
for i := range current.apiGroups {
344+
if current.apiGroups[i] != watcher.lastKnownState.apiGroups[i] {
345+
changed = true
346+
break
347+
}
348+
}
349+
}
350+
351+
if changed {
352+
klog.V(2).Info("Cluster state changed, scheduling debounced reload")
353+
if watcher.debounceTimer != nil {
354+
watcher.debounceTimer.Stop()
355+
}
356+
watcher.debounceTimer = time.AfterFunc(debounceWindow, func() {
357+
klog.V(2).Info("Debounce window expired, triggering reload")
358+
if err := onClusterStateChange(); err != nil {
359+
klog.Errorf("Failed to reload: %v", err)
360+
} else {
361+
watcher.mu.Lock()
362+
watcher.lastKnownState = captureState()
363+
watcher.mu.Unlock()
364+
klog.V(2).Info("Reload completed")
365+
}
366+
})
367+
}
368+
watcher.mu.Unlock()
369+
}
370+
}
371+
}()
372+
}
373+
374+
// stop stops the cluster state watcher
375+
func (w *clusterStateWatcher) stop() {
376+
if w == nil {
377+
return
378+
}
379+
380+
w.mu.Lock()
381+
defer w.mu.Unlock()
382+
383+
if w.debounceTimer != nil {
384+
w.debounceTimer.Stop()
385+
}
386+
387+
if w.stopCh == nil || w.stoppedCh == nil {
388+
return
389+
}
390+
391+
select {
392+
case <-w.stopCh:
393+
// Already closed or stopped
394+
return
395+
default:
396+
close(w.stopCh)
397+
<-w.stoppedCh
398+
}
399+
}

pkg/kubernetes/manager_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,49 @@ func (s *ManagerTestSuite) TestNewManager() {
228228
})
229229
}
230230

231+
func (s *ManagerTestSuite) TestClusterStateWatcherStop() {
232+
s.Run("stop() on nil watcher", func() {
233+
var watcher *clusterStateWatcher
234+
// Should not panic
235+
watcher.stop()
236+
})
237+
238+
s.Run("stop() on uninitialized watcher (nil channels)", func() {
239+
watcher := &clusterStateWatcher{}
240+
// Should not panic even with nil channels
241+
watcher.stop()
242+
})
243+
244+
s.Run("stop() on initialized watcher", func() {
245+
watcher := &clusterStateWatcher{
246+
stopCh: make(chan struct{}),
247+
stoppedCh: make(chan struct{}),
248+
}
249+
// Close the stoppedCh to simulate a running goroutine
250+
go func() {
251+
<-watcher.stopCh
252+
close(watcher.stoppedCh)
253+
}()
254+
// Should not panic and should stop cleanly
255+
watcher.stop()
256+
})
257+
258+
s.Run("stop() called multiple times", func() {
259+
watcher := &clusterStateWatcher{
260+
stopCh: make(chan struct{}),
261+
stoppedCh: make(chan struct{}),
262+
}
263+
go func() {
264+
<-watcher.stopCh
265+
close(watcher.stoppedCh)
266+
}()
267+
// First stop
268+
watcher.stop()
269+
// Second stop should not panic
270+
watcher.stop()
271+
})
272+
}
273+
231274
func TestManager(t *testing.T) {
232275
suite.Run(t, new(ManagerTestSuite))
233276
}

pkg/kubernetes/provider_kubeconfig.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ func (p *kubeConfigClusterProvider) GetDefaultTarget() string {
120120

121121
func (p *kubeConfigClusterProvider) WatchTargets(onKubeConfigChanged func() error) {
122122
m := p.managers[p.defaultContext]
123-
124123
m.WatchKubeConfig(onKubeConfigChanged)
124+
m.WatchClusterState(DefaultClusterStatePollInterval, DefaultClusterStateDebounceWindow, onKubeConfigChanged)
125125
}
126126

127127
func (p *kubeConfigClusterProvider) Close() {

pkg/kubernetes/provider_single.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func (p *singleClusterProvider) GetTargetParameterName() string {
8787

8888
func (p *singleClusterProvider) WatchTargets(watch func() error) {
8989
p.manager.WatchKubeConfig(watch)
90+
p.manager.WatchClusterState(DefaultClusterStatePollInterval, DefaultClusterStateDebounceWindow, watch)
9091
}
9192

9293
func (p *singleClusterProvider) Close() {

pkg/mcp/mcp.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,31 +98,44 @@ func NewServer(configuration Configuration) (*Server, error) {
9898

9999
func (s *Server) reloadKubernetesClusterProvider() error {
100100
ctx := context.Background()
101-
p, err := internalk8s.NewProvider(s.configuration.StaticConfig)
101+
102+
newProvider, err := internalk8s.NewProvider(s.configuration.StaticConfig)
103+
if err != nil {
104+
return err
105+
}
106+
107+
targets, err := newProvider.GetTargets(ctx)
102108
if err != nil {
109+
newProvider.Close()
103110
return err
104111
}
105112

106-
// close the old provider
107113
if s.p != nil {
108114
s.p.Close()
109115
}
110116

111-
s.p = p
117+
s.p = newProvider
112118

113-
targets, err := p.GetTargets(ctx)
114-
if err != nil {
119+
if err := s.rebuildTools(targets); err != nil {
115120
return err
116121
}
117122

123+
s.p.WatchTargets(s.reloadKubernetesClusterProvider)
124+
125+
return nil
126+
}
127+
128+
// rebuildTools rebuilds the MCP tool registry based on the current provider and targets.
129+
// This is called after the provider has been successfully validated and set.
130+
func (s *Server) rebuildTools(targets []string) error {
118131
filter := CompositeFilter(
119132
s.configuration.isToolApplicable,
120-
ShouldIncludeTargetListTool(p.GetTargetParameterName(), targets),
133+
ShouldIncludeTargetListTool(s.p.GetTargetParameterName(), targets),
121134
)
122135

123136
mutator := WithTargetParameter(
124-
p.GetDefaultTarget(),
125-
p.GetTargetParameterName(),
137+
s.p.GetDefaultTarget(),
138+
s.p.GetTargetParameterName(),
126139
targets,
127140
)
128141

@@ -136,7 +149,7 @@ func (s *Server) reloadKubernetesClusterProvider() error {
136149
applicableTools := make([]api.ServerTool, 0)
137150
s.enabledTools = make([]string, 0)
138151
for _, toolset := range s.configuration.Toolsets() {
139-
for _, tool := range toolset.GetTools(p) {
152+
for _, tool := range toolset.GetTools(s.p) {
140153
tool := mutator(tool)
141154
if !filter(tool) {
142155
continue
@@ -157,6 +170,7 @@ func (s *Server) reloadKubernetesClusterProvider() error {
157170
}
158171
s.server.RemoveTools(toolsToRemove...)
159172

173+
// Add new tools
160174
for _, tool := range applicableTools {
161175
goSdkTool, goSdkToolHandler, err := ServerToolToGoSdkTool(s, tool)
162176
if err != nil {
@@ -165,8 +179,6 @@ func (s *Server) reloadKubernetesClusterProvider() error {
165179
s.server.AddTool(goSdkTool, goSdkToolHandler)
166180
}
167181

168-
// start new watch
169-
s.p.WatchTargets(s.reloadKubernetesClusterProvider)
170182
return nil
171183
}
172184

0 commit comments

Comments
 (0)