Skip to content

Commit a3968a8

Browse files
committed
Non test code updates due to PodInfo rename
Signed-off-by: Shmuel Kallner <kallner@il.ibm.com>
1 parent abe1a89 commit a3968a8

File tree

11 files changed

+55
-55
lines changed

11 files changed

+55
-55
lines changed

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,18 @@ type PodMetricsClient interface {
5353
}
5454

5555
func (pm *podMetrics) String() string {
56-
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics())
56+
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics())
5757
}
5858

59-
func (pm *podMetrics) GetPod() *backend.Pod {
59+
func (pm *podMetrics) GetMetadata() *backend.Pod {
6060
return pm.pod.Load()
6161
}
6262

6363
func (pm *podMetrics) GetMetrics() *MetricsState {
6464
return pm.metrics.Load()
6565
}
6666

67-
func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
67+
func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) {
6868
pm.pod.Store(pod)
6969
}
7070

@@ -73,7 +73,7 @@ func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
7373
func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
7474
pm.startOnce.Do(func() {
7575
go func() {
76-
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
76+
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetMetadata())
7777
ticker := time.NewTicker(pm.interval)
7878
defer ticker.Stop()
7979
for {
@@ -84,7 +84,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
8484
return
8585
case <-ticker.C: // refresh metrics periodically
8686
if err := pm.refreshMetrics(); err != nil {
87-
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod())
87+
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetMetadata())
8888
}
8989
}
9090
}
@@ -95,7 +95,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
9595
func (pm *podMetrics) refreshMetrics() error {
9696
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
9797
defer cancel()
98-
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics())
98+
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics())
9999
if err != nil {
100100
pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err)
101101
}
@@ -115,7 +115,7 @@ func (pm *podMetrics) refreshMetrics() error {
115115
}
116116

117117
func (pm *podMetrics) stopRefreshLoop() {
118-
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
118+
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetMetadata())
119119
pm.stopOnce.Do(func() {
120120
close(pm.done)
121121
})

pkg/epp/backend/metrics/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type PodMetricsFactory struct {
5252
refreshMetricsInterval time.Duration
5353
}
5454

55-
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics {
55+
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.EndpointMetadata, ds datalayer.PoolInfo) PodMetrics {
5656
pm := &podMetrics{
5757
pmc: f.pmc,
5858
ds: ds,

pkg/epp/backend/pod.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2121
)
2222

23-
type Pod = datalayer.PodInfo
23+
type Pod = datalayer.EndpointMetadata

pkg/epp/datalayer/collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
8888
started := false
8989

9090
c.startOnce.Do(func() {
91-
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
91+
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress())
9292
c.ctx, c.cancel = context.WithCancel(ctx)
9393
started = true
9494
ready = make(chan struct{})

pkg/epp/datalayer/endpoint.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import (
2121
"sync/atomic"
2222
)
2323

24-
// EndpointPodState allows management of the Pod related attributes.
25-
type EndpointPodState interface {
26-
GetPod() *PodInfo
27-
UpdatePod(*PodInfo)
24+
// EndpointMetaState allows management of the EndpointMetadata related attributes.
25+
type EndpointMetaState interface {
26+
GetMetadata() *EndpointMetadata
27+
UpdateMetadata(*EndpointMetadata)
2828
GetAttributes() *Attributes
2929
}
3030

@@ -37,14 +37,14 @@ type EndpointMetricsState interface {
3737
// Endpoint represents an inference serving endpoint and its related attributes.
3838
type Endpoint interface {
3939
fmt.Stringer
40-
EndpointPodState
40+
EndpointMetaState
4141
EndpointMetricsState
4242
AttributeMap
4343
}
4444

4545
// ModelServer is an implementation of the Endpoint interface.
4646
type ModelServer struct {
47-
pod atomic.Pointer[PodInfo]
47+
pod atomic.Pointer[EndpointMetadata]
4848
metrics atomic.Pointer[Metrics]
4949
attributes *Attributes
5050
}
@@ -68,14 +68,14 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer {
6868
// String returns a representation of the ModelServer. For brevity, only names of
6969
// extended attributes are returned and not their values.
7070
func (srv *ModelServer) String() string {
71-
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys())
71+
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys())
7272
}
7373

74-
func (srv *ModelServer) GetPod() *PodInfo {
74+
func (srv *ModelServer) GetMetadata() *EndpointMetadata {
7575
return srv.pod.Load()
7676
}
7777

78-
func (srv *ModelServer) UpdatePod(pod *PodInfo) {
78+
func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) {
7979
srv.pod.Store(pod)
8080
}
8181

pkg/epp/datalayer/factory.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type PoolInfo interface {
4646
// providing methods to allocate and retire endpoints. This can potentially be used for
4747
// pooled memory or other management chores in the implementation.
4848
type EndpointFactory interface {
49-
NewEndpoint(parent context.Context, inpod *PodInfo, poolinfo PoolInfo) Endpoint
49+
NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, poolinfo PoolInfo) Endpoint
5050
ReleaseEndpoint(ep Endpoint)
5151
}
5252

@@ -71,16 +71,16 @@ func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Durati
7171
// NewEndpoint implements EndpointFactory.NewEndpoint.
7272
// Creates a new endpoint and starts its associated collector with its own ticker.
7373
// Guards against multiple concurrent calls for the same endpoint.
74-
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, _ PoolInfo) Endpoint {
75-
key := types.NamespacedName{Namespace: inpod.GetNamespacedName().Namespace, Name: inpod.GetNamespacedName().Name}
74+
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inEndpointMetadata *EndpointMetadata, _ PoolInfo) Endpoint {
75+
key := types.NamespacedName{Namespace: inEndpointMetadata.GetNamespacedName().Namespace, Name: inEndpointMetadata.GetNamespacedName().Name}
7676
logger := log.FromContext(parent).WithValues("pod", key)
7777

7878
if _, ok := lc.collectors.Load(key); ok {
7979
logger.Info("collector already running for endpoint", "endpoint", key)
8080
return nil
8181
}
8282

83-
endpoint := NewEndpoint(inpod, nil)
83+
endpoint := NewEndpoint(inEndpointMetadata, nil)
8484
collector := NewCollector() // TODO or full backward compatibility, set the logger and poolinfo
8585

8686
if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded {
@@ -102,7 +102,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo,
102102
// ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint
103103
// Stops the collector and cleans up resources for the endpoint
104104
func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) {
105-
key := ep.GetPod().GetNamespacedName()
105+
key := ep.GetMetadata().GetNamespacedName()
106106

107107
if value, ok := lc.collectors.LoadAndDelete(key); ok {
108108
collector := value.(*Collector)

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
9999
// Collect is triggered by the data layer framework to fetch potentially new
100100
// MSP metrics data for an endpoint.
101101
func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
102-
target := dataSrc.getMetricsEndpoint(ep.GetPod())
103-
families, err := dataSrc.client.Get(ctx, target, ep.GetPod())
102+
target := dataSrc.getMetricsEndpoint(ep.GetMetadata())
103+
families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata())
104104

105105
if err != nil {
106106
return err

pkg/epp/datastore/datastore.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,13 @@ func (ds *datastore) ModelRewriteGetAll() []*v1alpha2.InferenceModelRewrite {
239239
// /// Pods/endpoints APIs ///
240240
// TODO: add a flag for callers to specify the staleness threshold for metrics.
241241
// ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694
242-
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
243-
res := []backendmetrics.PodMetrics{}
242+
func (ds *datastore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint {
243+
res := []datalayer.Endpoint{}
244244

245245
ds.pods.Range(func(k, v any) bool {
246-
pm := v.(backendmetrics.PodMetrics)
247-
if predicate(pm) {
248-
res = append(res, pm)
246+
ep := v.(datalayer.Endpoint)
247+
if predicate(ep) {
248+
res = append(res, ep)
249249
}
250250
return true
251251
})
@@ -267,14 +267,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
267267
if len(ds.pool.TargetPorts) == 1 {
268268
modelServerMetricsPort = int(ds.modelServerMetricsPort)
269269
}
270-
pods := []*datalayer.PodInfo{}
270+
pods := []*datalayer.EndpointMetadata{}
271271
for idx, port := range ds.pool.TargetPorts {
272272
metricsPort := modelServerMetricsPort
273273
if metricsPort == 0 {
274274
metricsPort = port
275275
}
276276
pods = append(pods,
277-
&datalayer.PodInfo{
277+
&datalayer.EndpointMetadata{
278278
NamespacedName: types.NamespacedName{
279279
Name: pod.Name + "-rank-" + strconv.Itoa(idx),
280280
Namespace: pod.Namespace,
@@ -288,28 +288,28 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
288288
}
289289

290290
result := true
291-
for _, podInfo := range pods {
292-
var pm backendmetrics.PodMetrics
293-
existing, ok := ds.pods.Load(podInfo.NamespacedName)
291+
for _, endpointMetadata := range pods {
292+
var ep datalayer.Endpoint
293+
existing, ok := ds.pods.Load(endpointMetadata.NamespacedName)
294294
if !ok {
295-
pm = ds.epf.NewEndpoint(ds.parentCtx, podInfo, ds)
296-
ds.pods.Store(podInfo.NamespacedName, pm)
295+
ep = ds.epf.NewEndpoint(ds.parentCtx, endpointMetadata, ds)
296+
ds.pods.Store(endpointMetadata.NamespacedName, ep)
297297
result = false
298298
} else {
299-
pm = existing.(backendmetrics.PodMetrics)
299+
ep = existing.(backendmetrics.PodMetrics)
300300
}
301-
// Update pod properties if anything changed.
302-
pm.UpdatePod(podInfo)
301+
// Update endpoint properties if anything changed.
302+
ep.UpdateMetadata(endpointMetadata)
303303
}
304304
return result
305305
}
306306

307307
func (ds *datastore) PodDelete(podName string) {
308308
ds.pods.Range(func(k, v any) bool {
309-
pm := v.(backendmetrics.PodMetrics)
310-
if pm.GetPod().PodName == podName {
309+
ep := v.(datalayer.Endpoint)
310+
if ep.GetMetadata().PodName == podName {
311311
ds.pods.Delete(k)
312-
ds.epf.ReleaseEndpoint(pm)
312+
ds.epf.ReleaseEndpoint(ep)
313313
}
314314
return true
315315
})
@@ -341,10 +341,10 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err
341341

342342
// Remove pods that don't belong to the pool or not ready any more.
343343
ds.pods.Range(func(k, v any) bool {
344-
pm := v.(backendmetrics.PodMetrics)
345-
if exist := activePods[pm.GetPod().PodName]; !exist {
346-
logger.V(logutil.VERBOSE).Info("Removing pod", "pod", pm.GetPod())
347-
ds.PodDelete(pm.GetPod().PodName)
344+
ep := v.(datalayer.Endpoint)
345+
if exist := activePods[ep.GetMetadata().PodName]; !exist {
346+
logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata())
347+
ds.PodDelete(ep.GetMetadata().PodName)
348348
}
349349
return true
350350
})

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
7474
prometheus.GaugeValue,
7575
float64(pod.GetMetrics().WaitingQueueSize),
7676
pool.Name,
77-
pod.GetPod().NamespacedName.Name,
77+
pod.GetMetadata().NamespacedName.Name,
7878
)
7979
}
8080
}

pkg/epp/requestcontrol/director.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
264264
podTotalCount := 0
265265
podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
266266
podTotalCount++
267-
if _, found := endpoints[pm.GetPod().GetIPAddress()]; found {
267+
if _, found := endpoints[pm.GetMetadata().GetIPAddress()]; found {
268268
return true
269269
}
270270
return false
@@ -308,9 +308,9 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch
308308
pm := make([]schedulingtypes.Pod, len(pods))
309309
for i, pod := range pods {
310310
if pod.GetAttributes() != nil {
311-
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()}
311+
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()}
312312
} else {
313-
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()}
313+
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()}
314314
}
315315
}
316316

@@ -368,7 +368,7 @@ func (d *Director) GetRandomPod() *backend.Pod {
368368
}
369369
number := rand.Intn(len(pods))
370370
pod := pods[number]
371-
return pod.GetPod()
371+
return pod.GetMetadata()
372372
}
373373

374374
func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest,

0 commit comments

Comments
 (0)