diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index c84ae24fa..e78b6877c 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -489,7 +489,7 @@ func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName { names := make([]types.NamespacedName, 0, len(pods)) for _, p := range pods { - names = append(names, p.GetPod().NamespacedName) + names = append(names, p.GetMetadata().NamespacedName) } return names } diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index 1c7a90528..a61e4ee74 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -38,10 +38,10 @@ type FakePodMetrics struct { } func (fpm *FakePodMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetPod(), fpm.GetMetrics()) + return fmt.Sprintf("Metadata: %v; Metrics: %v", fpm.GetMetadata(), fpm.GetMetrics()) } -func (fpm *FakePodMetrics) GetPod() *backend.Pod { +func (fpm *FakePodMetrics) GetMetadata() *backend.Pod { return fpm.Pod } @@ -49,8 +49,8 @@ func (fpm *FakePodMetrics) GetMetrics() *MetricsState { return fpm.Metrics } -func (fpm *FakePodMetrics) UpdatePod(pod *datalayer.PodInfo) { - fpm.Pod = pod +func (fpm *FakePodMetrics) UpdateMetadata(metadata *datalayer.EndpointMetadata) { + fpm.Pod = metadata } func (fpm *FakePodMetrics) GetAttributes() *datalayer.Attributes { return fpm.Attributes diff --git a/pkg/epp/backend/metrics/pod_metrics.go b/pkg/epp/backend/metrics/pod_metrics.go index 4d22ef18c..5968842b3 100644 --- a/pkg/epp/backend/metrics/pod_metrics.go +++ b/pkg/epp/backend/metrics/pod_metrics.go @@ -25,7 +25,6 @@ import ( "github.com/go-logr/logr" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -35,7 +34,7 @@ const ( ) type podMetrics struct { - pod atomic.Pointer[backend.Pod] + metadata atomic.Pointer[datalayer.EndpointMetadata] metrics atomic.Pointer[MetricsState] pmc PodMetricsClient ds datalayer.PoolInfo @@ -49,23 +48,23 @@ type podMetrics struct { } type PodMetricsClient interface { - FetchMetrics(ctx context.Context, pod *backend.Pod, existing *MetricsState) (*MetricsState, error) + FetchMetrics(ctx context.Context, pod *datalayer.EndpointMetadata, existing *MetricsState) (*MetricsState, error) } func (pm *podMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics()) + return fmt.Sprintf("Metadata: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics()) } -func (pm *podMetrics) GetPod() *backend.Pod { - return pm.pod.Load() +func (pm *podMetrics) GetMetadata() *datalayer.EndpointMetadata { + return pm.metadata.Load() } func (pm *podMetrics) GetMetrics() *MetricsState { return pm.metrics.Load() } -func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) { - pm.pod.Store(pod) +func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { + pm.metadata.Store(pod) } // start starts a goroutine exactly once to periodically update metrics. The goroutine will be @@ -73,7 +72,7 @@ func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) { func (pm *podMetrics) startRefreshLoop(ctx context.Context) { pm.startOnce.Do(func() { go func() { - pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod()) + pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "metadata", pm.GetMetadata()) ticker := time.NewTicker(pm.interval) defer ticker.Stop() for { @@ -84,7 +83,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { return case <-ticker.C: // refresh metrics periodically if err := pm.refreshMetrics(); err != nil { - pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod()) + pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "metadata", pm.GetMetadata()) } } } @@ -95,7 +94,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { func (pm *podMetrics) refreshMetrics() error { ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) defer cancel() - updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics()) + updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics()) if err != nil { pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err) } @@ -115,7 +114,7 @@ func (pm *podMetrics) refreshMetrics() error { } func (pm *podMetrics) stopRefreshLoop() { - pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod()) + pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "metadata", pm.GetMetadata()) pm.stopOnce.Do(func() { close(pm.done) }) diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index 8a5561c0e..8d84701fc 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -29,7 +29,7 @@ import ( ) var ( - pod1Info = &datalayer.PodInfo{ + pod1Info = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1-rank-0", Namespace: "default", diff --git a/pkg/epp/backend/metrics/types.go b/pkg/epp/backend/metrics/types.go index 2c334a750..e21d4aa03 100644 --- a/pkg/epp/backend/metrics/types.go +++ b/pkg/epp/backend/metrics/types.go @@ -48,7 +48,7 @@ type PodMetricsFactory struct { refreshMetricsInterval time.Duration } -func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics { +func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, metadata *datalayer.EndpointMetadata, ds datalayer.PoolInfo) datalayer.Endpoint { pm := &podMetrics{ pmc: f.pmc, ds: ds, @@ -56,9 +56,9 @@ func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalaye startOnce: sync.Once{}, stopOnce: sync.Once{}, done: make(chan struct{}), - logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName), + logger: log.FromContext(parentCtx).WithValues("endpoint", metadata.NamespacedName), } - pm.pod.Store(pod) + pm.metadata.Store(metadata) pm.metrics.Store(NewMetricsState()) pm.startRefreshLoop(parentCtx) diff --git a/pkg/epp/backend/pod.go b/pkg/epp/backend/pod.go index 324a7479a..e24494042 100644 --- a/pkg/epp/backend/pod.go +++ b/pkg/epp/backend/pod.go @@ -20,4 +20,4 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" ) -type Pod = datalayer.PodInfo +type Pod = datalayer.EndpointMetadata diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 4d1f22348..dfe8fabfa 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -128,7 +128,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } endpointPool1 := pool.InferencePoolToEndpointPool(pool1) - if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantEndpoints: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -146,7 +146,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } newEndpointPool1 := pool.InferencePoolToEndpointPool(newPool1) - if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -162,7 +162,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } newEndpointPool1 = pool.InferencePoolToEndpointPool(newPool1) - if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -176,7 +176,7 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPods: []string{}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantEndpoints: []string{}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } } @@ -184,7 +184,7 @@ func TestInferencePoolReconciler(t *testing.T) { type diffStoreParams struct { wantPool *datalayer.EndpointPool - wantPods []string + wantEndpoints []string wantObjectives []*v1alpha2.InferenceObjective } @@ -195,15 +195,15 @@ func diffStore(store datastore.Datastore, params diffStoreParams) string { } // Default wantPods if not set because PodGetAll returns an empty slice when empty. - if params.wantPods == nil { - params.wantPods = []string{} + if params.wantEndpoints == nil { + params.wantEndpoints = []string{} } - gotPods := []string{} - for _, pm := range store.PodList(datastore.AllPodsPredicate) { - gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + gotEndpoints := []string{} + for _, em := range store.PodList(datastore.AllPodsPredicate) { + gotEndpoints = append(gotEndpoints, em.GetMetadata().NamespacedName.Name) } - if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { - return "pods:" + diff + if diff := cmp.Diff(params.wantEndpoints, gotEndpoints, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + return "endpoints:" + diff } // Default wantModels if not set because ModelGetAll returns an empty slice when empty. @@ -355,8 +355,8 @@ func xDiffStore(store datastore.Datastore, params xDiffStoreParams) string { params.wantPods = []string{} } gotPods := []string{} - for _, pm := range store.PodList(datastore.AllPodsPredicate) { - gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + for _, em := range store.PodList(datastore.AllPodsPredicate) { + gotPods = append(gotPods, em.GetMetadata().NamespacedName.Name) } if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { return "pods:" + diff diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 11b52578c..02ed8a36c 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -219,7 +219,7 @@ func TestPodReconciler(t *testing.T) { var gotPods []*corev1.Pod for _, pm := range store.PodList(datastore.AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { diff --git a/pkg/epp/datalayer/collector.go b/pkg/epp/datalayer/collector.go index 86a8f7b4e..f7be8125b 100644 --- a/pkg/epp/datalayer/collector.go +++ b/pkg/epp/datalayer/collector.go @@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc started := false c.startOnce.Do(func() { - logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress()) + logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress()) c.ctx, c.cancel = context.WithCancel(ctx) started = true ready = make(chan struct{}) diff --git a/pkg/epp/datalayer/collector_test.go b/pkg/epp/datalayer/collector_test.go index f05175f54..7752ebc7b 100644 --- a/pkg/epp/datalayer/collector_test.go +++ b/pkg/epp/datalayer/collector_test.go @@ -32,14 +32,14 @@ import ( // --- Test Stubs --- func defaultEndpoint() Endpoint { - pod := &PodInfo{ + meta := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod-name", Namespace: "default", }, Address: "1.2.3.4:5678", } - ms := NewEndpoint(pod, nil) + ms := NewEndpoint(meta, nil) return ms } diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 2d262eb3a..67ad56010 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -21,10 +21,10 @@ import ( "sync/atomic" ) -// EndpointPodState allows management of the Pod related attributes. -type EndpointPodState interface { - GetPod() *PodInfo - UpdatePod(*PodInfo) +// EndpointMetaState allows management of the EndpointMetadata related attributes. +type EndpointMetaState interface { + GetMetadata() *EndpointMetadata + UpdateMetadata(*EndpointMetadata) GetAttributes() *Attributes } @@ -37,22 +37,22 @@ type EndpointMetricsState interface { // Endpoint represents an inference serving endpoint and its related attributes. type Endpoint interface { fmt.Stringer - EndpointPodState + EndpointMetaState EndpointMetricsState AttributeMap } // ModelServer is an implementation of the Endpoint interface. type ModelServer struct { - pod atomic.Pointer[PodInfo] + pod atomic.Pointer[EndpointMetadata] metrics atomic.Pointer[Metrics] attributes *Attributes } -// NewEndpoint returns a new ModelServer with the given PodInfo and Metrics. -func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { - if pod == nil { - pod = &PodInfo{} +// NewEndpoint returns a new ModelServer with the given EndpointMetadata and Metrics. +func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer { + if meta == nil { + meta = &EndpointMetadata{} } if metrics == nil { metrics = NewMetrics() @@ -60,7 +60,7 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { ep := &ModelServer{ attributes: NewAttributes(), } - ep.UpdatePod(pod) + ep.UpdateMetadata(meta) ep.UpdateMetrics(metrics) return ep } @@ -68,14 +68,14 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { // String returns a representation of the ModelServer. For brevity, only names of // extended attributes are returned and not their values. func (srv *ModelServer) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys()) + return fmt.Sprintf("Metadata: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys()) } -func (srv *ModelServer) GetPod() *PodInfo { +func (srv *ModelServer) GetMetadata() *EndpointMetadata { return srv.pod.Load() } -func (srv *ModelServer) UpdatePod(pod *PodInfo) { +func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) { srv.pod.Store(pod) } diff --git a/pkg/epp/datalayer/podinfo.go b/pkg/epp/datalayer/endpoint_metadata.go similarity index 63% rename from pkg/epp/datalayer/podinfo.go rename to pkg/epp/datalayer/endpoint_metadata.go index 7cbd6d886..70c78e0df 100644 --- a/pkg/epp/datalayer/podinfo.go +++ b/pkg/epp/datalayer/endpoint_metadata.go @@ -30,8 +30,8 @@ type Addressable interface { GetNamespacedName() types.NamespacedName } -// PodInfo represents the relevant Kubernetes Pod state of an inference server. -type PodInfo struct { +// EndpointMetadata represents the relevant Kubernetes Pod state of an inference server. +type EndpointMetadata struct { NamespacedName types.NamespacedName PodName string Address string @@ -40,16 +40,16 @@ type PodInfo struct { Labels map[string]string } -// String returns a string representation of the pod. -func (p *PodInfo) String() string { - if p == nil { +// String returns a string representation of the endpoint. +func (e *EndpointMetadata) String() string { + if e == nil { return "" } - return fmt.Sprintf("%+v", *p) + return fmt.Sprintf("%+v", *e) } // Clone returns a full copy of the object. -func (p *PodInfo) Clone() *PodInfo { +func (p *EndpointMetadata) Clone() *EndpointMetadata { if p == nil { return nil } @@ -58,7 +58,7 @@ func (p *PodInfo) Clone() *PodInfo { for key, value := range p.Labels { clonedLabels[key] = value } - return &PodInfo{ + return &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: p.NamespacedName.Name, Namespace: p.NamespacedName.Namespace, @@ -71,22 +71,22 @@ func (p *PodInfo) Clone() *PodInfo { } } -// GetNamespacedName gets the namespace name of the Pod. -func (p *PodInfo) GetNamespacedName() types.NamespacedName { - return p.NamespacedName +// GetNamespacedName gets the namespace name of the Endpoint. +func (e *EndpointMetadata) GetNamespacedName() types.NamespacedName { + return e.NamespacedName } -// GetIPAddress returns the Pod's IP address. -func (p *PodInfo) GetIPAddress() string { - return p.Address +// GetIPAddress returns the Endpoint's IP address. +func (e *EndpointMetadata) GetIPAddress() string { + return e.Address } -// GetPort returns the Pod's inference port. -func (p *PodInfo) GetPort() string { - return p.Port +// GetPort returns the Endpoint's inference port. +func (e *EndpointMetadata) GetPort() string { + return e.Port } -// GetMetricsHost returns the pod's metrics host (ip:port) -func (p *PodInfo) GetMetricsHost() string { - return p.MetricsHost +// GetMetricsHost returns the Endpoint's metrics host (ip:port) +func (e *EndpointMetadata) GetMetricsHost() string { + return e.MetricsHost } diff --git a/pkg/epp/datalayer/podinfo_test.go b/pkg/epp/datalayer/endpoint_metadata_test.go similarity index 90% rename from pkg/epp/datalayer/podinfo_test.go rename to pkg/epp/datalayer/endpoint_metadata_test.go index baf804a22..fb9bc7a93 100644 --- a/pkg/epp/datalayer/podinfo_test.go +++ b/pkg/epp/datalayer/endpoint_metadata_test.go @@ -48,14 +48,14 @@ var ( PodIP: podip, }, } - expected = &PodInfo{ + expected = &EndpointMetadata{ NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}, Address: podip, Labels: labels, } ) -func TestPodInfoClone(t *testing.T) { +func TestEndpointMetadataClone(t *testing.T) { clone := expected.Clone() assert.NotSame(t, expected, clone) if diff := cmp.Diff(expected, clone); diff != "" { @@ -66,8 +66,8 @@ func TestPodInfoClone(t *testing.T) { assert.Equal(t, "prod", expected.Labels["env"], "mutating clone should not affect original") } -func TestPodInfoString(t *testing.T) { - podinfo := PodInfo{ +func TestEndpointMetadataString(t *testing.T) { + endpointMetadata := EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name, Namespace: pod.Namespace, @@ -79,7 +79,7 @@ func TestPodInfoString(t *testing.T) { Labels: labels, } - s := podinfo.String() + s := endpointMetadata.String() assert.Contains(t, s, name) assert.Contains(t, s, namespace) assert.Contains(t, s, podip) diff --git a/pkg/epp/datalayer/factory.go b/pkg/epp/datalayer/factory.go index 3a81763d5..78765095c 100644 --- a/pkg/epp/datalayer/factory.go +++ b/pkg/epp/datalayer/factory.go @@ -46,7 +46,7 @@ type PoolInfo interface { // providing methods to allocate and retire endpoints. This can potentially be used for // pooled memory or other management chores in the implementation. type EndpointFactory interface { - NewEndpoint(parent context.Context, inpod *PodInfo, poolinfo PoolInfo) Endpoint + NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, poolinfo PoolInfo) Endpoint ReleaseEndpoint(ep Endpoint) } @@ -71,8 +71,8 @@ func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Durati // NewEndpoint implements EndpointFactory.NewEndpoint. // Creates a new endpoint and starts its associated collector with its own ticker. // Guards against multiple concurrent calls for the same endpoint. -func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, _ PoolInfo) Endpoint { - key := types.NamespacedName{Namespace: inpod.GetNamespacedName().Namespace, Name: inpod.GetNamespacedName().Name} +func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inEndpointMetadata *EndpointMetadata, _ PoolInfo) Endpoint { + key := types.NamespacedName{Namespace: inEndpointMetadata.GetNamespacedName().Namespace, Name: inEndpointMetadata.GetNamespacedName().Name} logger := log.FromContext(parent).WithValues("pod", key) if _, ok := lc.collectors.Load(key); ok { @@ -80,7 +80,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, return nil } - endpoint := NewEndpoint(inpod, nil) + endpoint := NewEndpoint(inEndpointMetadata, nil) collector := NewCollector() // TODO or full backward compatibility, set the logger and poolinfo if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded { @@ -102,7 +102,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, // ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint // Stops the collector and cleans up resources for the endpoint func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) { - key := ep.GetPod().GetNamespacedName() + key := ep.GetMetadata().GetNamespacedName() if value, ok := lc.collectors.LoadAndDelete(key); ok { collector := value.(*Collector) diff --git a/pkg/epp/datalayer/factory_test.go b/pkg/epp/datalayer/factory_test.go index 302baa396..89a51ffd2 100644 --- a/pkg/epp/datalayer/factory_test.go +++ b/pkg/epp/datalayer/factory_test.go @@ -31,7 +31,7 @@ func TestFactory(t *testing.T) { source := &FakeDataSource{} factory := NewEndpointFactory([]DataSource{source}, 100*time.Millisecond) - pod1 := &PodInfo{ + pod1 := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", @@ -44,7 +44,7 @@ func TestFactory(t *testing.T) { dup := factory.NewEndpoint(context.Background(), pod1, nil) assert.Nil(t, dup, "expected to fail to create a duplicate collector") - pod2 := &PodInfo{ + pod2 := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod2", Namespace: "default", diff --git a/pkg/epp/datalayer/fake.go b/pkg/epp/datalayer/fake.go index 24d01c139..c82fd6768 100644 --- a/pkg/epp/datalayer/fake.go +++ b/pkg/epp/datalayer/fake.go @@ -49,8 +49,8 @@ func (fds *FakeDataSource) AddExtractor(_ Extractor) error { return nil } func (fds *FakeDataSource) Collect(ctx context.Context, ep Endpoint) error { atomic.AddInt64(&fds.callCount, 1) - if metrics, ok := fds.Metrics[ep.GetPod().Clone().NamespacedName]; ok { - if _, ok := fds.Errors[ep.GetPod().Clone().NamespacedName]; !ok { + if metrics, ok := fds.Metrics[ep.GetMetadata().Clone().NamespacedName]; ok { + if _, ok := fds.Errors[ep.GetMetadata().Clone().NamespacedName]; !ok { ep.UpdateMetrics(metrics) } } diff --git a/pkg/epp/datalayer/metrics/datasource.go b/pkg/epp/datalayer/metrics/datasource.go index df4b1d378..5e79ba599 100644 --- a/pkg/epp/datalayer/metrics/datasource.go +++ b/pkg/epp/datalayer/metrics/datasource.go @@ -94,8 +94,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error { // Collect is triggered by the data layer framework to fetch potentially new // MSP metrics data for an endpoint. func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error { - target := dataSrc.getMetricsEndpoint(ep.GetPod()) - families, err := dataSrc.client.Get(ctx, target, ep.GetPod()) + target := dataSrc.getMetricsEndpoint(ep.GetMetadata()) + families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata()) if err != nil { return err diff --git a/pkg/epp/datalayer/metrics/datasource_test.go b/pkg/epp/datalayer/metrics/datasource_test.go index 35255b7bc..9a834df55 100644 --- a/pkg/epp/datalayer/metrics/datasource_test.go +++ b/pkg/epp/datalayer/metrics/datasource_test.go @@ -50,7 +50,7 @@ func TestDatasource(t *testing.T) { ctx := context.Background() factory := datalayer.NewEndpointFactory([]datalayer.DataSource{source}, 100*time.Millisecond) - pod := &datalayer.PodInfo{ + pod := &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", diff --git a/pkg/epp/datalayer/metrics/extractor.go b/pkg/epp/datalayer/metrics/extractor.go index 5404b3b62..e296cd3bc 100644 --- a/pkg/epp/datalayer/metrics/extractor.go +++ b/pkg/epp/datalayer/metrics/extractor.go @@ -153,7 +153,7 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi } } - logger := log.FromContext(ctx).WithValues("pod", ep.GetPod().NamespacedName) + logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().NamespacedName) if updated { clone.UpdateTime = time.Now() logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", clone) diff --git a/pkg/epp/datalayer/metrics/logger_test.go b/pkg/epp/datalayer/metrics/logger_test.go index 4bf68cf0a..0c0a20761 100644 --- a/pkg/epp/datalayer/metrics/logger_test.go +++ b/pkg/epp/datalayer/metrics/logger_test.go @@ -72,21 +72,21 @@ func TestLogger(t *testing.T) { logOutput := b.read() assert.Contains(t, logOutput, "Refreshing Prometheus Metrics {\"ReadyPods\": 2}") - assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Pod: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678") + assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Metadata: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678") assert.Contains(t, logOutput, "Metrics: {ActiveModels:map[modelA:1] WaitingModels:map[modelB:2] MaxActiveModels:5") assert.Contains(t, logOutput, "RunningQueueSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048") - assert.Contains(t, logOutput, "Pod: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679") + assert.Contains(t, logOutput, "Metadata: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679") assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"") } -var pod1 = &datalayer.PodInfo{ +var pod1 = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", }, Address: "1.2.3.4:5678", } -var pod2 = &datalayer.PodInfo{ +var pod2 = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod2", Namespace: "default", diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index e1e995ffe..fa879b5cc 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -240,13 +240,13 @@ func (ds *datastore) ModelRewriteGetAll() []*v1alpha2.InferenceModelRewrite { // /// Pods/endpoints APIs /// // TODO: add a flag for callers to specify the staleness threshold for metrics. // ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694 -func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics { - res := []backendmetrics.PodMetrics{} +func (ds *datastore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint { + res := []datalayer.Endpoint{} ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if predicate(pm) { - res = append(res, pm) + ep := v.(datalayer.Endpoint) + if predicate(ep) { + res = append(res, ep) } return true }) @@ -268,14 +268,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { if len(ds.pool.TargetPorts) == 1 { modelServerMetricsPort = int(ds.modelServerMetricsPort) } - pods := []*datalayer.PodInfo{} + pods := []*datalayer.EndpointMetadata{} for idx, port := range ds.pool.TargetPorts { metricsPort := modelServerMetricsPort if metricsPort == 0 { metricsPort = port } pods = append(pods, - &datalayer.PodInfo{ + &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name + "-rank-" + strconv.Itoa(idx), Namespace: pod.Namespace, @@ -289,28 +289,28 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { } result := true - for _, podInfo := range pods { - var pm backendmetrics.PodMetrics - existing, ok := ds.pods.Load(podInfo.NamespacedName) + for _, endpointMetadata := range pods { + var ep datalayer.Endpoint + existing, ok := ds.pods.Load(endpointMetadata.NamespacedName) if !ok { - pm = ds.epf.NewEndpoint(ds.parentCtx, podInfo, ds) - ds.pods.Store(podInfo.NamespacedName, pm) + ep = ds.epf.NewEndpoint(ds.parentCtx, endpointMetadata, ds) + ds.pods.Store(endpointMetadata.NamespacedName, ep) result = false } else { - pm = existing.(backendmetrics.PodMetrics) + ep = existing.(backendmetrics.PodMetrics) } - // Update pod properties if anything changed. - pm.UpdatePod(podInfo) + // Update endpoint properties if anything changed. + ep.UpdateMetadata(endpointMetadata) } return result } func (ds *datastore) PodDelete(podName string) { ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if pm.GetPod().PodName == podName { + ep := v.(datalayer.Endpoint) + if ep.GetMetadata().PodName == podName { ds.pods.Delete(k) - ds.epf.ReleaseEndpoint(pm) + ds.epf.ReleaseEndpoint(ep) } return true }) @@ -342,10 +342,10 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err // Remove pods that don't belong to the pool or not ready any more. ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if exist := activePods[pm.GetPod().PodName]; !exist { - logger.V(logutil.VERBOSE).Info("Removing pod", "pod", pm.GetPod()) - ds.PodDelete(pm.GetPod().PodName) + ep := v.(datalayer.Endpoint) + if exist := activePods[ep.GetMetadata().PodName]; !exist { + logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata().PodName) + ds.PodDelete(ep.GetMetadata().PodName) } return true }) diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 71ea2aa8e..ac41b2084 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -429,7 +429,7 @@ func TestPods(t *testing.T) { test.op(ctx, ds) var gotPods []*corev1.Pod for _, pm := range ds.PodList(AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { @@ -440,18 +440,18 @@ func TestPods(t *testing.T) { } } -func TestPodInfo(t *testing.T) { +func TestEndpointMetadata(t *testing.T) { tests := []struct { - name string - op func(ctx context.Context, ds Datastore) - pool *v1.InferencePool - existingPods []*corev1.Pod - wantPodInfos []*datalayer.PodInfo + name string + op func(ctx context.Context, ds Datastore) + pool *v1.InferencePool + existingPods []*corev1.Pod + wantEndpointMetas []*datalayer.EndpointMetadata }{ { name: "Add new pod, no existing pods, should add", existingPods: []*corev1.Pod{}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -473,7 +473,7 @@ func TestPodInfo(t *testing.T) { { name: "Add new pod, no existing pods, should add, multiple target ports", existingPods: []*corev1.Pod{}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -507,7 +507,7 @@ func TestPodInfo(t *testing.T) { { name: "Add new pod, with existing pods, should add, multiple target ports", existingPods: []*corev1.Pod{pod1}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -565,7 +565,7 @@ func TestPodInfo(t *testing.T) { { name: "Delete the pod, multiple target ports", existingPods: []*corev1.Pod{pod1, pod2}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -617,11 +617,11 @@ func TestPodInfo(t *testing.T) { } test.op(ctx, ds) - var gotPodInfos []*datalayer.PodInfo + var gotMetadata []*datalayer.EndpointMetadata for _, pm := range ds.PodList(AllPodsPredicate) { - gotPodInfos = append(gotPodInfos, pm.GetPod()) + gotMetadata = append(gotMetadata, pm.GetMetadata()) } - if diff := cmp.Diff(test.wantPodInfos, gotPodInfos, cmpopts.SortSlices(func(a, b *datalayer.PodInfo) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" { + if diff := cmp.Diff(test.wantEndpointMetas, gotMetadata, cmpopts.SortSlices(func(a, b *datalayer.EndpointMetadata) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" { t.Errorf("ConvertTo() mismatch (-want +got):\n%s", diff) } }) diff --git a/pkg/epp/metrics/collectors/inference_pool.go b/pkg/epp/metrics/collectors/inference_pool.go index af6b445ff..becf3705e 100644 --- a/pkg/epp/metrics/collectors/inference_pool.go +++ b/pkg/epp/metrics/collectors/inference_pool.go @@ -73,7 +73,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) { prometheus.GaugeValue, float64(pod.GetMetrics().WaitingQueueSize), pool.Name, - pod.GetPod().NamespacedName.Name, + pod.GetMetadata().NamespacedName.Name, ) } } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index f093bbfe7..19e69cf2b 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -268,9 +268,9 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch pm := make([]schedulingtypes.Pod, len(pods)) for i, pod := range pods { if pod.GetAttributes() != nil { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} } else { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} } } @@ -328,7 +328,7 @@ func (d *Director) GetRandomPod() *backend.Pod { } number := rand.Intn(len(pods)) pod := pods[number] - return pod.GetPod() + return pod.GetMetadata() } func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, diff --git a/pkg/epp/requestcontrol/locator.go b/pkg/epp/requestcontrol/locator.go index 80dc13209..f50c81ee4 100644 --- a/pkg/epp/requestcontrol/locator.go +++ b/pkg/epp/requestcontrol/locator.go @@ -122,7 +122,7 @@ func (d *DatastorePodLocator) Locate(ctx context.Context, requestMetadata map[st podTotalCount++ // If the pod's IP is in our allowed map, include it. // Note: We use GetIPAddress() which should align with the subset address. - if pod := pm.GetPod(); pod != nil { + if pod := pm.GetMetadata(); pod != nil { if _, found := endpoints[pod.GetIPAddress()]; found { return true } diff --git a/pkg/epp/requestcontrol/locator_test.go b/pkg/epp/requestcontrol/locator_test.go index 616debdfc..0f5a5df1c 100644 --- a/pkg/epp/requestcontrol/locator_test.go +++ b/pkg/epp/requestcontrol/locator_test.go @@ -114,7 +114,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { var gotIPs []string for _, pm := range result { - gotIPs = append(gotIPs, pm.GetPod().GetIPAddress()) + gotIPs = append(gotIPs, pm.GetMetadata().GetIPAddress()) } assert.ElementsMatch(t, tc.expectedPodIPs, gotIPs, "Locate returned unexpected set of pods") }) diff --git a/pkg/epp/saturationdetector/saturationdetector.go b/pkg/epp/saturationdetector/saturationdetector.go index 46b94b22c..0891207e9 100644 --- a/pkg/epp/saturationdetector/saturationdetector.go +++ b/pkg/epp/saturationdetector/saturationdetector.go @@ -91,8 +91,8 @@ func (d *Detector) IsSaturated(ctx context.Context, candidatePods []backendmetri for _, podMetric := range candidatePods { metrics := podMetric.GetMetrics() podNn := "unknown-pod" - if podMetric.GetPod() != nil { - podNn = podMetric.GetPod().NamespacedName.String() + if podMetric.GetMetadata() != nil { + podNn = podMetric.GetMetadata().NamespacedName.String() } if metrics == nil {