From ec6598b18d2485098e427fd944c0d1f439a09b29 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 16 Nov 2025 12:25:03 +0200 Subject: [PATCH 01/10] Renamed PodInfo EndpointMetadata Signed-off-by: Shmuel Kallner --- .../{podinfo.go => endpoint_metadata.go} | 40 +++++++++---------- ...info_test.go => endpoint_metadata_test.go} | 8 ++-- 2 files changed, 24 insertions(+), 24 deletions(-) rename pkg/epp/datalayer/{podinfo.go => endpoint_metadata.go} (63%) rename pkg/epp/datalayer/{podinfo_test.go => endpoint_metadata_test.go} (92%) diff --git a/pkg/epp/datalayer/podinfo.go b/pkg/epp/datalayer/endpoint_metadata.go similarity index 63% rename from pkg/epp/datalayer/podinfo.go rename to pkg/epp/datalayer/endpoint_metadata.go index 7cbd6d886..70c78e0df 100644 --- a/pkg/epp/datalayer/podinfo.go +++ b/pkg/epp/datalayer/endpoint_metadata.go @@ -30,8 +30,8 @@ type Addressable interface { GetNamespacedName() types.NamespacedName } -// PodInfo represents the relevant Kubernetes Pod state of an inference server. -type PodInfo struct { +// EndpointMetadata represents the relevant Kubernetes Pod state of an inference server. +type EndpointMetadata struct { NamespacedName types.NamespacedName PodName string Address string @@ -40,16 +40,16 @@ type PodInfo struct { Labels map[string]string } -// String returns a string representation of the pod. -func (p *PodInfo) String() string { - if p == nil { +// String returns a string representation of the endpoint. +func (e *EndpointMetadata) String() string { + if e == nil { return "" } - return fmt.Sprintf("%+v", *p) + return fmt.Sprintf("%+v", *e) } // Clone returns a full copy of the object. -func (p *PodInfo) Clone() *PodInfo { +func (p *EndpointMetadata) Clone() *EndpointMetadata { if p == nil { return nil } @@ -58,7 +58,7 @@ func (p *PodInfo) Clone() *PodInfo { for key, value := range p.Labels { clonedLabels[key] = value } - return &PodInfo{ + return &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: p.NamespacedName.Name, Namespace: p.NamespacedName.Namespace, @@ -71,22 +71,22 @@ func (p *PodInfo) Clone() *PodInfo { } } -// GetNamespacedName gets the namespace name of the Pod. -func (p *PodInfo) GetNamespacedName() types.NamespacedName { - return p.NamespacedName +// GetNamespacedName gets the namespace name of the Endpoint. +func (e *EndpointMetadata) GetNamespacedName() types.NamespacedName { + return e.NamespacedName } -// GetIPAddress returns the Pod's IP address. -func (p *PodInfo) GetIPAddress() string { - return p.Address +// GetIPAddress returns the Endpoint's IP address. +func (e *EndpointMetadata) GetIPAddress() string { + return e.Address } -// GetPort returns the Pod's inference port. -func (p *PodInfo) GetPort() string { - return p.Port +// GetPort returns the Endpoint's inference port. +func (e *EndpointMetadata) GetPort() string { + return e.Port } -// GetMetricsHost returns the pod's metrics host (ip:port) -func (p *PodInfo) GetMetricsHost() string { - return p.MetricsHost +// GetMetricsHost returns the Endpoint's metrics host (ip:port) +func (e *EndpointMetadata) GetMetricsHost() string { + return e.MetricsHost } diff --git a/pkg/epp/datalayer/podinfo_test.go b/pkg/epp/datalayer/endpoint_metadata_test.go similarity index 92% rename from pkg/epp/datalayer/podinfo_test.go rename to pkg/epp/datalayer/endpoint_metadata_test.go index baf804a22..ad25a29f4 100644 --- a/pkg/epp/datalayer/podinfo_test.go +++ b/pkg/epp/datalayer/endpoint_metadata_test.go @@ -48,14 +48,14 @@ var ( PodIP: podip, }, } - expected = &PodInfo{ + expected = &EndpointMetadata{ NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}, Address: podip, Labels: labels, } ) -func TestPodInfoClone(t *testing.T) { +func TestEndpointMetadataClone(t *testing.T) { clone := expected.Clone() assert.NotSame(t, expected, clone) if diff := cmp.Diff(expected, clone); diff != "" { @@ -67,7 +67,7 @@ func TestPodInfoClone(t *testing.T) { } func TestPodInfoString(t *testing.T) { - podinfo := PodInfo{ + endpointMetadata := EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name, Namespace: pod.Namespace, @@ -79,7 +79,7 @@ func TestPodInfoString(t *testing.T) { Labels: labels, } - s := podinfo.String() + s := endpointMetadata.String() assert.Contains(t, s, name) assert.Contains(t, s, namespace) assert.Contains(t, s, podip) From af9cdbdedcc2130f1b3592085b2d908a31c363db Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 16 Nov 2025 12:26:54 +0200 Subject: [PATCH 02/10] Non test code updates due to PodInfo rename Signed-off-by: Shmuel Kallner --- pkg/epp/backend/metrics/pod_metrics.go | 14 ++--- pkg/epp/backend/metrics/types.go | 2 +- pkg/epp/backend/pod.go | 2 +- pkg/epp/datalayer/collector.go | 2 +- pkg/epp/datalayer/endpoint.go | 18 +++--- pkg/epp/datalayer/factory.go | 10 ++-- pkg/epp/datalayer/metrics/datasource.go | 4 +- pkg/epp/datastore/datastore.go | 44 +++++++-------- pkg/epp/metrics/collectors/inference_pool.go | 2 +- pkg/epp/requestcontrol/director.go | 55 ++++++++++++++++++- .../saturationdetector/saturationdetector.go | 4 +- 11 files changed, 103 insertions(+), 54 deletions(-) diff --git a/pkg/epp/backend/metrics/pod_metrics.go b/pkg/epp/backend/metrics/pod_metrics.go index 4d22ef18c..c5d1393cd 100644 --- a/pkg/epp/backend/metrics/pod_metrics.go +++ b/pkg/epp/backend/metrics/pod_metrics.go @@ -53,10 +53,10 @@ type PodMetricsClient interface { } func (pm *podMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics()) + return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics()) } -func (pm *podMetrics) GetPod() *backend.Pod { +func (pm *podMetrics) GetMetadata() *backend.Pod { return pm.pod.Load() } @@ -64,7 +64,7 @@ func (pm *podMetrics) GetMetrics() *MetricsState { return pm.metrics.Load() } -func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) { +func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { pm.pod.Store(pod) } @@ -73,7 +73,7 @@ func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) { func (pm *podMetrics) startRefreshLoop(ctx context.Context) { pm.startOnce.Do(func() { go func() { - pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod()) + pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetMetadata()) ticker := time.NewTicker(pm.interval) defer ticker.Stop() for { @@ -84,7 +84,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { return case <-ticker.C: // refresh metrics periodically if err := pm.refreshMetrics(); err != nil { - pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod()) + pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetMetadata()) } } } @@ -95,7 +95,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { func (pm *podMetrics) refreshMetrics() error { ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) defer cancel() - updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics()) + updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics()) if err != nil { pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err) } @@ -115,7 +115,7 @@ func (pm *podMetrics) refreshMetrics() error { } func (pm *podMetrics) stopRefreshLoop() { - pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod()) + pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetMetadata()) pm.stopOnce.Do(func() { close(pm.done) }) diff --git a/pkg/epp/backend/metrics/types.go b/pkg/epp/backend/metrics/types.go index 2c334a750..6334bcab1 100644 --- a/pkg/epp/backend/metrics/types.go +++ b/pkg/epp/backend/metrics/types.go @@ -48,7 +48,7 @@ type PodMetricsFactory struct { refreshMetricsInterval time.Duration } -func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics { +func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.EndpointMetadata, ds datalayer.PoolInfo) PodMetrics { pm := &podMetrics{ pmc: f.pmc, ds: ds, diff --git a/pkg/epp/backend/pod.go b/pkg/epp/backend/pod.go index 324a7479a..e24494042 100644 --- a/pkg/epp/backend/pod.go +++ b/pkg/epp/backend/pod.go @@ -20,4 +20,4 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" ) -type Pod = datalayer.PodInfo +type Pod = datalayer.EndpointMetadata diff --git a/pkg/epp/datalayer/collector.go b/pkg/epp/datalayer/collector.go index 86a8f7b4e..f7be8125b 100644 --- a/pkg/epp/datalayer/collector.go +++ b/pkg/epp/datalayer/collector.go @@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc started := false c.startOnce.Do(func() { - logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress()) + logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress()) c.ctx, c.cancel = context.WithCancel(ctx) started = true ready = make(chan struct{}) diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 2d262eb3a..60388890e 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -21,10 +21,10 @@ import ( "sync/atomic" ) -// EndpointPodState allows management of the Pod related attributes. -type EndpointPodState interface { - GetPod() *PodInfo - UpdatePod(*PodInfo) +// EndpointMetaState allows management of the EndpointMetadata related attributes. +type EndpointMetaState interface { + GetMetadata() *EndpointMetadata + UpdateMetadata(*EndpointMetadata) GetAttributes() *Attributes } @@ -37,14 +37,14 @@ type EndpointMetricsState interface { // Endpoint represents an inference serving endpoint and its related attributes. type Endpoint interface { fmt.Stringer - EndpointPodState + EndpointMetaState EndpointMetricsState AttributeMap } // ModelServer is an implementation of the Endpoint interface. type ModelServer struct { - pod atomic.Pointer[PodInfo] + pod atomic.Pointer[EndpointMetadata] metrics atomic.Pointer[Metrics] attributes *Attributes } @@ -68,14 +68,14 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { // String returns a representation of the ModelServer. For brevity, only names of // extended attributes are returned and not their values. func (srv *ModelServer) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys()) + return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys()) } -func (srv *ModelServer) GetPod() *PodInfo { +func (srv *ModelServer) GetMetadata() *EndpointMetadata { return srv.pod.Load() } -func (srv *ModelServer) UpdatePod(pod *PodInfo) { +func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) { srv.pod.Store(pod) } diff --git a/pkg/epp/datalayer/factory.go b/pkg/epp/datalayer/factory.go index 3a81763d5..78765095c 100644 --- a/pkg/epp/datalayer/factory.go +++ b/pkg/epp/datalayer/factory.go @@ -46,7 +46,7 @@ type PoolInfo interface { // providing methods to allocate and retire endpoints. This can potentially be used for // pooled memory or other management chores in the implementation. type EndpointFactory interface { - NewEndpoint(parent context.Context, inpod *PodInfo, poolinfo PoolInfo) Endpoint + NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, poolinfo PoolInfo) Endpoint ReleaseEndpoint(ep Endpoint) } @@ -71,8 +71,8 @@ func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Durati // NewEndpoint implements EndpointFactory.NewEndpoint. // Creates a new endpoint and starts its associated collector with its own ticker. // Guards against multiple concurrent calls for the same endpoint. -func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, _ PoolInfo) Endpoint { - key := types.NamespacedName{Namespace: inpod.GetNamespacedName().Namespace, Name: inpod.GetNamespacedName().Name} +func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inEndpointMetadata *EndpointMetadata, _ PoolInfo) Endpoint { + key := types.NamespacedName{Namespace: inEndpointMetadata.GetNamespacedName().Namespace, Name: inEndpointMetadata.GetNamespacedName().Name} logger := log.FromContext(parent).WithValues("pod", key) if _, ok := lc.collectors.Load(key); ok { @@ -80,7 +80,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, return nil } - endpoint := NewEndpoint(inpod, nil) + endpoint := NewEndpoint(inEndpointMetadata, nil) collector := NewCollector() // TODO or full backward compatibility, set the logger and poolinfo if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded { @@ -102,7 +102,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, // ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint // Stops the collector and cleans up resources for the endpoint func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) { - key := ep.GetPod().GetNamespacedName() + key := ep.GetMetadata().GetNamespacedName() if value, ok := lc.collectors.LoadAndDelete(key); ok { collector := value.(*Collector) diff --git a/pkg/epp/datalayer/metrics/datasource.go b/pkg/epp/datalayer/metrics/datasource.go index df4b1d378..5e79ba599 100644 --- a/pkg/epp/datalayer/metrics/datasource.go +++ b/pkg/epp/datalayer/metrics/datasource.go @@ -94,8 +94,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error { // Collect is triggered by the data layer framework to fetch potentially new // MSP metrics data for an endpoint. func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error { - target := dataSrc.getMetricsEndpoint(ep.GetPod()) - families, err := dataSrc.client.Get(ctx, target, ep.GetPod()) + target := dataSrc.getMetricsEndpoint(ep.GetMetadata()) + families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata()) if err != nil { return err diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index e1e995ffe..6f2345e78 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -240,13 +240,13 @@ func (ds *datastore) ModelRewriteGetAll() []*v1alpha2.InferenceModelRewrite { // /// Pods/endpoints APIs /// // TODO: add a flag for callers to specify the staleness threshold for metrics. // ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694 -func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics { - res := []backendmetrics.PodMetrics{} +func (ds *datastore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint { + res := []datalayer.Endpoint{} ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if predicate(pm) { - res = append(res, pm) + ep := v.(datalayer.Endpoint) + if predicate(ep) { + res = append(res, ep) } return true }) @@ -268,14 +268,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { if len(ds.pool.TargetPorts) == 1 { modelServerMetricsPort = int(ds.modelServerMetricsPort) } - pods := []*datalayer.PodInfo{} + pods := []*datalayer.EndpointMetadata{} for idx, port := range ds.pool.TargetPorts { metricsPort := modelServerMetricsPort if metricsPort == 0 { metricsPort = port } pods = append(pods, - &datalayer.PodInfo{ + &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name + "-rank-" + strconv.Itoa(idx), Namespace: pod.Namespace, @@ -289,28 +289,28 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { } result := true - for _, podInfo := range pods { - var pm backendmetrics.PodMetrics - existing, ok := ds.pods.Load(podInfo.NamespacedName) + for _, endpointMetadata := range pods { + var ep datalayer.Endpoint + existing, ok := ds.pods.Load(endpointMetadata.NamespacedName) if !ok { - pm = ds.epf.NewEndpoint(ds.parentCtx, podInfo, ds) - ds.pods.Store(podInfo.NamespacedName, pm) + ep = ds.epf.NewEndpoint(ds.parentCtx, endpointMetadata, ds) + ds.pods.Store(endpointMetadata.NamespacedName, ep) result = false } else { - pm = existing.(backendmetrics.PodMetrics) + ep = existing.(backendmetrics.PodMetrics) } - // Update pod properties if anything changed. - pm.UpdatePod(podInfo) + // Update endpoint properties if anything changed. + ep.UpdateMetadata(endpointMetadata) } return result } func (ds *datastore) PodDelete(podName string) { ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if pm.GetPod().PodName == podName { + ep := v.(datalayer.Endpoint) + if ep.GetMetadata().PodName == podName { ds.pods.Delete(k) - ds.epf.ReleaseEndpoint(pm) + ds.epf.ReleaseEndpoint(ep) } return true }) @@ -342,10 +342,10 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err // Remove pods that don't belong to the pool or not ready any more. ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if exist := activePods[pm.GetPod().PodName]; !exist { - logger.V(logutil.VERBOSE).Info("Removing pod", "pod", pm.GetPod()) - ds.PodDelete(pm.GetPod().PodName) + ep := v.(datalayer.Endpoint) + if exist := activePods[ep.GetMetadata().PodName]; !exist { + logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata()) + ds.PodDelete(ep.GetMetadata().PodName) } return true }) diff --git a/pkg/epp/metrics/collectors/inference_pool.go b/pkg/epp/metrics/collectors/inference_pool.go index af6b445ff..becf3705e 100644 --- a/pkg/epp/metrics/collectors/inference_pool.go +++ b/pkg/epp/metrics/collectors/inference_pool.go @@ -73,7 +73,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) { prometheus.GaugeValue, float64(pod.GetMetrics().WaitingQueueSize), pool.Name, - pod.GetPod().NamespacedName.Name, + pod.GetMetadata().NamespacedName.Name, ) } } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index f093bbfe7..e8822d270 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -235,6 +235,55 @@ func (d *Director) selectWeightedModel(models []v1alpha2.TargetModel) string { return models[len(models)-1].ModelRewrite } +<<<<<<< HEAD +======= +// getCandidatePodsForScheduling gets the list of relevant endpoints for the scheduling cycle from the datastore. +// according to EPP protocol, if "x-gateway-destination-endpoint-subset" is set on the request metadata and specifies +// a subset of endpoints, only these endpoints will be considered as candidates for the scheduler. +// Snapshot pod metrics from the datastore to: +// 1. Reduce concurrent access to the datastore. +// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles. +func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics { + loggerTrace := log.FromContext(ctx).V(logutil.TRACE) + + subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any) + if !found { + return d.datastore.PodList(datastore.AllPodsPredicate) + } + + // Check if endpoint key is present in the subset map and ensure there is at least one value + endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any) + if !found { + return d.datastore.PodList(datastore.AllPodsPredicate) + } else if len(endpointSubsetList) == 0 { + loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") + return []backendmetrics.PodMetrics{} + } + + // Create a map of endpoint addresses for easy lookup + endpoints := make(map[string]bool) + for _, endpoint := range endpointSubsetList { + // Extract address from endpoint + // The endpoint is formatted as "
:" (ex. "10.0.1.0:8080") + epStr := strings.Split(endpoint.(string), ":")[0] + endpoints[epStr] = true + } + + podTotalCount := 0 + podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool { + podTotalCount++ + if _, found := endpoints[pm.GetMetadata().GetIPAddress()]; found { + return true + } + return false + }) + + loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFilteredList)) + + return podFilteredList +} + +>>>>>>> 83c41ae (Non test code updates due to PodInfo rename) // prepareRequest populates the RequestContext and calls the registered PreRequest plugins // for allowing plugging customized logic based on the scheduling result. func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) { @@ -268,9 +317,9 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch pm := make([]schedulingtypes.Pod, len(pods)) for i, pod := range pods { if pod.GetAttributes() != nil { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} } else { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} } } @@ -328,7 +377,7 @@ func (d *Director) GetRandomPod() *backend.Pod { } number := rand.Intn(len(pods)) pod := pods[number] - return pod.GetPod() + return pod.GetMetadata() } func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, diff --git a/pkg/epp/saturationdetector/saturationdetector.go b/pkg/epp/saturationdetector/saturationdetector.go index 46b94b22c..0891207e9 100644 --- a/pkg/epp/saturationdetector/saturationdetector.go +++ b/pkg/epp/saturationdetector/saturationdetector.go @@ -91,8 +91,8 @@ func (d *Detector) IsSaturated(ctx context.Context, candidatePods []backendmetri for _, podMetric := range candidatePods { metrics := podMetric.GetMetrics() podNn := "unknown-pod" - if podMetric.GetPod() != nil { - podNn = podMetric.GetPod().NamespacedName.String() + if podMetric.GetMetadata() != nil { + podNn = podMetric.GetMetadata().NamespacedName.String() } if metrics == nil { From ac7ce6c2115c71a3c7eeb21cd113d8c9025fa97b Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 16 Nov 2025 12:27:44 +0200 Subject: [PATCH 03/10] Test updates due to PodInfo rename Signed-off-by: Shmuel Kallner --- pkg/epp/backend/metrics/fake.go | 6 +-- pkg/epp/backend/metrics/pod_metrics_test.go | 2 +- .../inferencepool_reconciler_test.go | 28 +++++------ pkg/epp/controller/pod_reconciler_test.go | 2 +- pkg/epp/datalayer/collector_test.go | 4 +- pkg/epp/datastore/datastore_test.go | 26 +++++----- pkg/epp/requestcontrol/director.go | 49 ------------------- 7 files changed, 34 insertions(+), 83 deletions(-) diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index 1c7a90528..018241ea8 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -38,10 +38,10 @@ type FakePodMetrics struct { } func (fpm *FakePodMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetPod(), fpm.GetMetrics()) + return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetMetadata(), fpm.GetMetrics()) } -func (fpm *FakePodMetrics) GetPod() *backend.Pod { +func (fpm *FakePodMetrics) GetMetadata() *backend.Pod { return fpm.Pod } @@ -49,7 +49,7 @@ func (fpm *FakePodMetrics) GetMetrics() *MetricsState { return fpm.Metrics } -func (fpm *FakePodMetrics) UpdatePod(pod *datalayer.PodInfo) { +func (fpm *FakePodMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { fpm.Pod = pod } func (fpm *FakePodMetrics) GetAttributes() *datalayer.Attributes { diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index 8a5561c0e..8d84701fc 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -29,7 +29,7 @@ import ( ) var ( - pod1Info = &datalayer.PodInfo{ + pod1Info = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1-rank-0", Namespace: "default", diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 4d1f22348..dfe8fabfa 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -128,7 +128,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } endpointPool1 := pool.InferencePoolToEndpointPool(pool1) - if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantEndpoints: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -146,7 +146,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } newEndpointPool1 := pool.InferencePoolToEndpointPool(newPool1) - if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -162,7 +162,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } newEndpointPool1 = pool.InferencePoolToEndpointPool(newPool1) - if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -176,7 +176,7 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPods: []string{}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantEndpoints: []string{}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } } @@ -184,7 +184,7 @@ func TestInferencePoolReconciler(t *testing.T) { type diffStoreParams struct { wantPool *datalayer.EndpointPool - wantPods []string + wantEndpoints []string wantObjectives []*v1alpha2.InferenceObjective } @@ -195,15 +195,15 @@ func diffStore(store datastore.Datastore, params diffStoreParams) string { } // Default wantPods if not set because PodGetAll returns an empty slice when empty. - if params.wantPods == nil { - params.wantPods = []string{} + if params.wantEndpoints == nil { + params.wantEndpoints = []string{} } - gotPods := []string{} - for _, pm := range store.PodList(datastore.AllPodsPredicate) { - gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + gotEndpoints := []string{} + for _, em := range store.PodList(datastore.AllPodsPredicate) { + gotEndpoints = append(gotEndpoints, em.GetMetadata().NamespacedName.Name) } - if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { - return "pods:" + diff + if diff := cmp.Diff(params.wantEndpoints, gotEndpoints, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + return "endpoints:" + diff } // Default wantModels if not set because ModelGetAll returns an empty slice when empty. @@ -355,8 +355,8 @@ func xDiffStore(store datastore.Datastore, params xDiffStoreParams) string { params.wantPods = []string{} } gotPods := []string{} - for _, pm := range store.PodList(datastore.AllPodsPredicate) { - gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + for _, em := range store.PodList(datastore.AllPodsPredicate) { + gotPods = append(gotPods, em.GetMetadata().NamespacedName.Name) } if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { return "pods:" + diff diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 11b52578c..f73b4e645 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -219,7 +219,7 @@ func TestPodReconciler(t *testing.T) { var gotPods []*corev1.Pod for _, pm := range store.PodList(datastore.AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { diff --git a/pkg/epp/datalayer/collector_test.go b/pkg/epp/datalayer/collector_test.go index f05175f54..7752ebc7b 100644 --- a/pkg/epp/datalayer/collector_test.go +++ b/pkg/epp/datalayer/collector_test.go @@ -32,14 +32,14 @@ import ( // --- Test Stubs --- func defaultEndpoint() Endpoint { - pod := &PodInfo{ + meta := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod-name", Namespace: "default", }, Address: "1.2.3.4:5678", } - ms := NewEndpoint(pod, nil) + ms := NewEndpoint(meta, nil) return ms } diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 71ea2aa8e..834764b01 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -429,7 +429,7 @@ func TestPods(t *testing.T) { test.op(ctx, ds) var gotPods []*corev1.Pod for _, pm := range ds.PodList(AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { @@ -442,16 +442,16 @@ func TestPods(t *testing.T) { func TestPodInfo(t *testing.T) { tests := []struct { - name string - op func(ctx context.Context, ds Datastore) - pool *v1.InferencePool - existingPods []*corev1.Pod - wantPodInfos []*datalayer.PodInfo + name string + op func(ctx context.Context, ds Datastore) + pool *v1.InferencePool + existingPods []*corev1.Pod + wantEndpointMetas []*datalayer.EndpointMetadata }{ { name: "Add new pod, no existing pods, should add", existingPods: []*corev1.Pod{}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -473,7 +473,7 @@ func TestPodInfo(t *testing.T) { { name: "Add new pod, no existing pods, should add, multiple target ports", existingPods: []*corev1.Pod{}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -507,7 +507,7 @@ func TestPodInfo(t *testing.T) { { name: "Add new pod, with existing pods, should add, multiple target ports", existingPods: []*corev1.Pod{pod1}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -565,7 +565,7 @@ func TestPodInfo(t *testing.T) { { name: "Delete the pod, multiple target ports", existingPods: []*corev1.Pod{pod1, pod2}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -617,11 +617,11 @@ func TestPodInfo(t *testing.T) { } test.op(ctx, ds) - var gotPodInfos []*datalayer.PodInfo + var gotMetadata []*datalayer.EndpointMetadata for _, pm := range ds.PodList(AllPodsPredicate) { - gotPodInfos = append(gotPodInfos, pm.GetPod()) + gotMetadata = append(gotMetadata, pm.GetMetadata()) } - if diff := cmp.Diff(test.wantPodInfos, gotPodInfos, cmpopts.SortSlices(func(a, b *datalayer.PodInfo) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" { + if diff := cmp.Diff(test.wantEndpointMetas, gotMetadata, cmpopts.SortSlices(func(a, b *datalayer.EndpointMetadata) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" { t.Errorf("ConvertTo() mismatch (-want +got):\n%s", diff) } }) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index e8822d270..19e69cf2b 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -235,55 +235,6 @@ func (d *Director) selectWeightedModel(models []v1alpha2.TargetModel) string { return models[len(models)-1].ModelRewrite } -<<<<<<< HEAD -======= -// getCandidatePodsForScheduling gets the list of relevant endpoints for the scheduling cycle from the datastore. -// according to EPP protocol, if "x-gateway-destination-endpoint-subset" is set on the request metadata and specifies -// a subset of endpoints, only these endpoints will be considered as candidates for the scheduler. -// Snapshot pod metrics from the datastore to: -// 1. Reduce concurrent access to the datastore. -// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles. -func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []backendmetrics.PodMetrics { - loggerTrace := log.FromContext(ctx).V(logutil.TRACE) - - subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any) - if !found { - return d.datastore.PodList(datastore.AllPodsPredicate) - } - - // Check if endpoint key is present in the subset map and ensure there is at least one value - endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any) - if !found { - return d.datastore.PodList(datastore.AllPodsPredicate) - } else if len(endpointSubsetList) == 0 { - loggerTrace.Info("found empty subset filter in request metadata, filtering all pods") - return []backendmetrics.PodMetrics{} - } - - // Create a map of endpoint addresses for easy lookup - endpoints := make(map[string]bool) - for _, endpoint := range endpointSubsetList { - // Extract address from endpoint - // The endpoint is formatted as "
:" (ex. "10.0.1.0:8080") - epStr := strings.Split(endpoint.(string), ":")[0] - endpoints[epStr] = true - } - - podTotalCount := 0 - podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool { - podTotalCount++ - if _, found := endpoints[pm.GetMetadata().GetIPAddress()]; found { - return true - } - return false - }) - - loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFilteredList)) - - return podFilteredList -} - ->>>>>>> 83c41ae (Non test code updates due to PodInfo rename) // prepareRequest populates the RequestContext and calls the registered PreRequest plugins // for allowing plugging customized logic based on the scheduling result. func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) { From 7f5098f48fa86ffb914616982888f978ea317103 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Tue, 18 Nov 2025 13:24:09 +0200 Subject: [PATCH 04/10] Updated new code with new struct names Signed-off-by: Shmuel Kallner --- pkg/epp/datalayer/endpoint.go | 8 ++++---- pkg/epp/datalayer/factory_test.go | 4 ++-- pkg/epp/datalayer/metrics/datasource_test.go | 2 +- pkg/epp/datalayer/metrics/extractor.go | 2 +- pkg/epp/datalayer/metrics/logger_test.go | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 60388890e..8300771a2 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -50,9 +50,9 @@ type ModelServer struct { } // NewEndpoint returns a new ModelServer with the given PodInfo and Metrics. -func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { - if pod == nil { - pod = &PodInfo{} +func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer { + if meta == nil { + meta = &EndpointMetadata{} } if metrics == nil { metrics = NewMetrics() @@ -60,7 +60,7 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { ep := &ModelServer{ attributes: NewAttributes(), } - ep.UpdatePod(pod) + ep.UpdateMetadata(meta) ep.UpdateMetrics(metrics) return ep } diff --git a/pkg/epp/datalayer/factory_test.go b/pkg/epp/datalayer/factory_test.go index 302baa396..89a51ffd2 100644 --- a/pkg/epp/datalayer/factory_test.go +++ b/pkg/epp/datalayer/factory_test.go @@ -31,7 +31,7 @@ func TestFactory(t *testing.T) { source := &FakeDataSource{} factory := NewEndpointFactory([]DataSource{source}, 100*time.Millisecond) - pod1 := &PodInfo{ + pod1 := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", @@ -44,7 +44,7 @@ func TestFactory(t *testing.T) { dup := factory.NewEndpoint(context.Background(), pod1, nil) assert.Nil(t, dup, "expected to fail to create a duplicate collector") - pod2 := &PodInfo{ + pod2 := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod2", Namespace: "default", diff --git a/pkg/epp/datalayer/metrics/datasource_test.go b/pkg/epp/datalayer/metrics/datasource_test.go index 35255b7bc..9a834df55 100644 --- a/pkg/epp/datalayer/metrics/datasource_test.go +++ b/pkg/epp/datalayer/metrics/datasource_test.go @@ -50,7 +50,7 @@ func TestDatasource(t *testing.T) { ctx := context.Background() factory := datalayer.NewEndpointFactory([]datalayer.DataSource{source}, 100*time.Millisecond) - pod := &datalayer.PodInfo{ + pod := &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", diff --git a/pkg/epp/datalayer/metrics/extractor.go b/pkg/epp/datalayer/metrics/extractor.go index 5404b3b62..2c40c9600 100644 --- a/pkg/epp/datalayer/metrics/extractor.go +++ b/pkg/epp/datalayer/metrics/extractor.go @@ -153,7 +153,7 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi } } - logger := log.FromContext(ctx).WithValues("pod", ep.GetPod().NamespacedName) + logger := log.FromContext(ctx).WithValues("pod", ep.GetMetadata().NamespacedName) if updated { clone.UpdateTime = time.Now() logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", clone) diff --git a/pkg/epp/datalayer/metrics/logger_test.go b/pkg/epp/datalayer/metrics/logger_test.go index 4bf68cf0a..b060a5e2f 100644 --- a/pkg/epp/datalayer/metrics/logger_test.go +++ b/pkg/epp/datalayer/metrics/logger_test.go @@ -79,14 +79,14 @@ func TestLogger(t *testing.T) { assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"") } -var pod1 = &datalayer.PodInfo{ +var pod1 = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", }, Address: "1.2.3.4:5678", } -var pod2 = &datalayer.PodInfo{ +var pod2 = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod2", Namespace: "default", From 5f26a8b1b7fd070aa96fdbd5f24dc34bd0d9af62 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Wed, 19 Nov 2025 14:16:47 +0200 Subject: [PATCH 05/10] Changed PodInfo to EndpointMetadata in comments and test function names Signed-off-by: Shmuel Kallner --- pkg/epp/datalayer/endpoint.go | 2 +- pkg/epp/datalayer/endpoint_metadata_test.go | 2 +- pkg/epp/datastore/datastore_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 8300771a2..1ba7f939a 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -49,7 +49,7 @@ type ModelServer struct { attributes *Attributes } -// NewEndpoint returns a new ModelServer with the given PodInfo and Metrics. +// NewEndpoint returns a new ModelServer with the given EndpointMetadata and Metrics. func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer { if meta == nil { meta = &EndpointMetadata{} diff --git a/pkg/epp/datalayer/endpoint_metadata_test.go b/pkg/epp/datalayer/endpoint_metadata_test.go index ad25a29f4..fb9bc7a93 100644 --- a/pkg/epp/datalayer/endpoint_metadata_test.go +++ b/pkg/epp/datalayer/endpoint_metadata_test.go @@ -66,7 +66,7 @@ func TestEndpointMetadataClone(t *testing.T) { assert.Equal(t, "prod", expected.Labels["env"], "mutating clone should not affect original") } -func TestPodInfoString(t *testing.T) { +func TestEndpointMetadataString(t *testing.T) { endpointMetadata := EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name, diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 834764b01..ac41b2084 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -440,7 +440,7 @@ func TestPods(t *testing.T) { } } -func TestPodInfo(t *testing.T) { +func TestEndpointMetadata(t *testing.T) { tests := []struct { name string op func(ctx context.Context, ds Datastore) From de2d347acb17036c58cb40b45e6319eeb4f01f65 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 23 Nov 2025 12:57:03 +0200 Subject: [PATCH 06/10] Updates from review comments Signed-off-by: Shmuel Kallner --- pkg/epp/backend/metrics/pod_metrics.go | 11 +++++------ pkg/epp/backend/metrics/types.go | 6 +++--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/epp/backend/metrics/pod_metrics.go b/pkg/epp/backend/metrics/pod_metrics.go index c5d1393cd..b99392af6 100644 --- a/pkg/epp/backend/metrics/pod_metrics.go +++ b/pkg/epp/backend/metrics/pod_metrics.go @@ -25,7 +25,6 @@ import ( "github.com/go-logr/logr" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -35,7 +34,7 @@ const ( ) type podMetrics struct { - pod atomic.Pointer[backend.Pod] + metadata atomic.Pointer[datalayer.EndpointMetadata] metrics atomic.Pointer[MetricsState] pmc PodMetricsClient ds datalayer.PoolInfo @@ -49,15 +48,15 @@ type podMetrics struct { } type PodMetricsClient interface { - FetchMetrics(ctx context.Context, pod *backend.Pod, existing *MetricsState) (*MetricsState, error) + FetchMetrics(ctx context.Context, pod *datalayer.EndpointMetadata, existing *MetricsState) (*MetricsState, error) } func (pm *podMetrics) String() string { return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics()) } -func (pm *podMetrics) GetMetadata() *backend.Pod { - return pm.pod.Load() +func (pm *podMetrics) GetMetadata() *datalayer.EndpointMetadata { + return pm.metadata.Load() } func (pm *podMetrics) GetMetrics() *MetricsState { @@ -65,7 +64,7 @@ func (pm *podMetrics) GetMetrics() *MetricsState { } func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { - pm.pod.Store(pod) + pm.metadata.Store(pod) } // start starts a goroutine exactly once to periodically update metrics. The goroutine will be diff --git a/pkg/epp/backend/metrics/types.go b/pkg/epp/backend/metrics/types.go index 6334bcab1..e21d4aa03 100644 --- a/pkg/epp/backend/metrics/types.go +++ b/pkg/epp/backend/metrics/types.go @@ -48,7 +48,7 @@ type PodMetricsFactory struct { refreshMetricsInterval time.Duration } -func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.EndpointMetadata, ds datalayer.PoolInfo) PodMetrics { +func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, metadata *datalayer.EndpointMetadata, ds datalayer.PoolInfo) datalayer.Endpoint { pm := &podMetrics{ pmc: f.pmc, ds: ds, @@ -56,9 +56,9 @@ func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalaye startOnce: sync.Once{}, stopOnce: sync.Once{}, done: make(chan struct{}), - logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName), + logger: log.FromContext(parentCtx).WithValues("endpoint", metadata.NamespacedName), } - pm.pod.Store(pod) + pm.metadata.Store(metadata) pm.metrics.Store(NewMetricsState()) pm.startRefreshLoop(parentCtx) From 192584bc8c57ed97528ceb83a4e63a8db54049b0 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Mon, 1 Dec 2025 18:48:02 +0200 Subject: [PATCH 07/10] Updated code from rebase Signed-off-by: Shmuel Kallner --- cmd/epp/runner/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index c84ae24fa..e78b6877c 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -489,7 +489,7 @@ func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName { names := make([]types.NamespacedName, 0, len(pods)) for _, p := range pods { - names = append(names, p.GetPod().NamespacedName) + names = append(names, p.GetMetadata().NamespacedName) } return names } From 0827910280983a8f74fa2e82c49a02203ae184bc Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Tue, 2 Dec 2025 18:31:21 +0200 Subject: [PATCH 08/10] Make messages and variable names somewhat more consistent Signed-off-by: Shmuel Kallner --- pkg/epp/backend/metrics/fake.go | 6 +++--- pkg/epp/backend/metrics/pod_metrics.go | 8 ++++---- pkg/epp/datalayer/endpoint.go | 2 +- pkg/epp/datalayer/metrics/extractor.go | 2 +- pkg/epp/datalayer/metrics/logger_test.go | 4 ++-- pkg/epp/datastore/datastore.go | 2 +- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index 018241ea8..a61e4ee74 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -38,7 +38,7 @@ type FakePodMetrics struct { } func (fpm *FakePodMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetMetadata(), fpm.GetMetrics()) + return fmt.Sprintf("Metadata: %v; Metrics: %v", fpm.GetMetadata(), fpm.GetMetrics()) } func (fpm *FakePodMetrics) GetMetadata() *backend.Pod { @@ -49,8 +49,8 @@ func (fpm *FakePodMetrics) GetMetrics() *MetricsState { return fpm.Metrics } -func (fpm *FakePodMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { - fpm.Pod = pod +func (fpm *FakePodMetrics) UpdateMetadata(metadata *datalayer.EndpointMetadata) { + fpm.Pod = metadata } func (fpm *FakePodMetrics) GetAttributes() *datalayer.Attributes { return fpm.Attributes diff --git a/pkg/epp/backend/metrics/pod_metrics.go b/pkg/epp/backend/metrics/pod_metrics.go index b99392af6..5968842b3 100644 --- a/pkg/epp/backend/metrics/pod_metrics.go +++ b/pkg/epp/backend/metrics/pod_metrics.go @@ -52,7 +52,7 @@ type PodMetricsClient interface { } func (pm *podMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics()) + return fmt.Sprintf("Metadata: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics()) } func (pm *podMetrics) GetMetadata() *datalayer.EndpointMetadata { @@ -72,7 +72,7 @@ func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { func (pm *podMetrics) startRefreshLoop(ctx context.Context) { pm.startOnce.Do(func() { go func() { - pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetMetadata()) + pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "metadata", pm.GetMetadata()) ticker := time.NewTicker(pm.interval) defer ticker.Stop() for { @@ -83,7 +83,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { return case <-ticker.C: // refresh metrics periodically if err := pm.refreshMetrics(); err != nil { - pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetMetadata()) + pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "metadata", pm.GetMetadata()) } } } @@ -114,7 +114,7 @@ func (pm *podMetrics) refreshMetrics() error { } func (pm *podMetrics) stopRefreshLoop() { - pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetMetadata()) + pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "metadata", pm.GetMetadata()) pm.stopOnce.Do(func() { close(pm.done) }) diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 1ba7f939a..67ad56010 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -68,7 +68,7 @@ func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer { // String returns a representation of the ModelServer. For brevity, only names of // extended attributes are returned and not their values. func (srv *ModelServer) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys()) + return fmt.Sprintf("Metadata: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys()) } func (srv *ModelServer) GetMetadata() *EndpointMetadata { diff --git a/pkg/epp/datalayer/metrics/extractor.go b/pkg/epp/datalayer/metrics/extractor.go index 2c40c9600..e296cd3bc 100644 --- a/pkg/epp/datalayer/metrics/extractor.go +++ b/pkg/epp/datalayer/metrics/extractor.go @@ -153,7 +153,7 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi } } - logger := log.FromContext(ctx).WithValues("pod", ep.GetMetadata().NamespacedName) + logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().NamespacedName) if updated { clone.UpdateTime = time.Now() logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", clone) diff --git a/pkg/epp/datalayer/metrics/logger_test.go b/pkg/epp/datalayer/metrics/logger_test.go index b060a5e2f..0c0a20761 100644 --- a/pkg/epp/datalayer/metrics/logger_test.go +++ b/pkg/epp/datalayer/metrics/logger_test.go @@ -72,10 +72,10 @@ func TestLogger(t *testing.T) { logOutput := b.read() assert.Contains(t, logOutput, "Refreshing Prometheus Metrics {\"ReadyPods\": 2}") - assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Pod: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678") + assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Metadata: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678") assert.Contains(t, logOutput, "Metrics: {ActiveModels:map[modelA:1] WaitingModels:map[modelB:2] MaxActiveModels:5") assert.Contains(t, logOutput, "RunningQueueSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048") - assert.Contains(t, logOutput, "Pod: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679") + assert.Contains(t, logOutput, "Metadata: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679") assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"") } diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 6f2345e78..fa879b5cc 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -344,7 +344,7 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err ds.pods.Range(func(k, v any) bool { ep := v.(datalayer.Endpoint) if exist := activePods[ep.GetMetadata().PodName]; !exist { - logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata()) + logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata().PodName) ds.PodDelete(ep.GetMetadata().PodName) } return true From 6fad61d7d56e84419067bf3ec325b073626d0e58 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 7 Dec 2025 12:22:56 +0200 Subject: [PATCH 09/10] Rebase correction that was missed Signed-off-by: Shmuel Kallner --- pkg/epp/controller/pod_reconciler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index f73b4e645..02ed8a36c 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -219,7 +219,7 @@ func TestPodReconciler(t *testing.T) { var gotPods []*corev1.Pod for _, pm := range store.PodList(datastore.AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { From 227243c91f6e8e5cf923f9c8e572cf7c841fd970 Mon Sep 17 00:00:00 2001 From: Shmuel Kallner Date: Sun, 7 Dec 2025 12:23:46 +0200 Subject: [PATCH 10/10] Rebase corrections due to other changes Signed-off-by: Shmuel Kallner --- pkg/epp/datalayer/fake.go | 4 ++-- pkg/epp/requestcontrol/locator.go | 2 +- pkg/epp/requestcontrol/locator_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/epp/datalayer/fake.go b/pkg/epp/datalayer/fake.go index 24d01c139..c82fd6768 100644 --- a/pkg/epp/datalayer/fake.go +++ b/pkg/epp/datalayer/fake.go @@ -49,8 +49,8 @@ func (fds *FakeDataSource) AddExtractor(_ Extractor) error { return nil } func (fds *FakeDataSource) Collect(ctx context.Context, ep Endpoint) error { atomic.AddInt64(&fds.callCount, 1) - if metrics, ok := fds.Metrics[ep.GetPod().Clone().NamespacedName]; ok { - if _, ok := fds.Errors[ep.GetPod().Clone().NamespacedName]; !ok { + if metrics, ok := fds.Metrics[ep.GetMetadata().Clone().NamespacedName]; ok { + if _, ok := fds.Errors[ep.GetMetadata().Clone().NamespacedName]; !ok { ep.UpdateMetrics(metrics) } } diff --git a/pkg/epp/requestcontrol/locator.go b/pkg/epp/requestcontrol/locator.go index 80dc13209..f50c81ee4 100644 --- a/pkg/epp/requestcontrol/locator.go +++ b/pkg/epp/requestcontrol/locator.go @@ -122,7 +122,7 @@ func (d *DatastorePodLocator) Locate(ctx context.Context, requestMetadata map[st podTotalCount++ // If the pod's IP is in our allowed map, include it. // Note: We use GetIPAddress() which should align with the subset address. - if pod := pm.GetPod(); pod != nil { + if pod := pm.GetMetadata(); pod != nil { if _, found := endpoints[pod.GetIPAddress()]; found { return true } diff --git a/pkg/epp/requestcontrol/locator_test.go b/pkg/epp/requestcontrol/locator_test.go index 616debdfc..0f5a5df1c 100644 --- a/pkg/epp/requestcontrol/locator_test.go +++ b/pkg/epp/requestcontrol/locator_test.go @@ -114,7 +114,7 @@ func TestDatastorePodLocator_Locate(t *testing.T) { var gotIPs []string for _, pm := range result { - gotIPs = append(gotIPs, pm.GetPod().GetIPAddress()) + gotIPs = append(gotIPs, pm.GetMetadata().GetIPAddress()) } assert.ElementsMatch(t, tc.expectedPodIPs, gotIPs, "Locate returned unexpected set of pods") })