Skip to content

Commit 267ed01

Browse files
committed
Non test code updates due to PodInfo rename
Signed-off-by: Shmuel Kallner <kallner@il.ibm.com>
1 parent 78fccc8 commit 267ed01

File tree

12 files changed

+61
-61
lines changed

12 files changed

+61
-61
lines changed

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,18 @@ type PodMetricsClient interface {
5353
}
5454

5555
func (pm *podMetrics) String() string {
56-
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics())
56+
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics())
5757
}
5858

59-
func (pm *podMetrics) GetPod() *backend.Pod {
59+
func (pm *podMetrics) GetMetadata() *backend.Pod {
6060
return pm.pod.Load()
6161
}
6262

6363
func (pm *podMetrics) GetMetrics() *MetricsState {
6464
return pm.metrics.Load()
6565
}
6666

67-
func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
67+
func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) {
6868
pm.pod.Store(pod)
6969
}
7070

@@ -73,7 +73,7 @@ func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
7373
func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
7474
pm.startOnce.Do(func() {
7575
go func() {
76-
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
76+
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetMetadata())
7777
ticker := time.NewTicker(pm.interval)
7878
defer ticker.Stop()
7979
for {
@@ -84,7 +84,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
8484
return
8585
case <-ticker.C: // refresh metrics periodically
8686
if err := pm.refreshMetrics(); err != nil {
87-
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod())
87+
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetMetadata())
8888
}
8989
}
9090
}
@@ -95,7 +95,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
9595
func (pm *podMetrics) refreshMetrics() error {
9696
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
9797
defer cancel()
98-
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics())
98+
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics())
9999
if err != nil {
100100
pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err)
101101
}
@@ -115,7 +115,7 @@ func (pm *podMetrics) refreshMetrics() error {
115115
}
116116

117117
func (pm *podMetrics) stopRefreshLoop() {
118-
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
118+
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetMetadata())
119119
pm.stopOnce.Do(func() {
120120
close(pm.done)
121121
})

pkg/epp/backend/metrics/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type PodMetricsFactory struct {
5252
refreshMetricsInterval time.Duration
5353
}
5454

55-
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics {
55+
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.EndpointMetadata, ds datalayer.PoolInfo) PodMetrics {
5656
pm := &podMetrics{
5757
pmc: f.pmc,
5858
ds: ds,

pkg/epp/backend/pod.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
2121
)
2222

23-
type Pod = datalayer.PodInfo
23+
type Pod = datalayer.EndpointMetadata

pkg/epp/datalayer/collector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
8888
started := false
8989

9090
c.startOnce.Do(func() {
91-
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
91+
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress())
9292
c.ctx, c.cancel = context.WithCancel(ctx)
9393
started = true
9494
ready = make(chan struct{})

pkg/epp/datalayer/endpoint.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ import (
2121
"sync/atomic"
2222
)
2323

24-
// EndpointPodState allows management of the Pod related attributes.
25-
type EndpointPodState interface {
26-
GetPod() *PodInfo
27-
UpdatePod(*PodInfo)
24+
// EndpointMetaState allows management of the EndpointMetadata related attributes.
25+
type EndpointMetaState interface {
26+
GetMetadata() *EndpointMetadata
27+
UpdateMetadata(*EndpointMetadata)
2828
GetAttributes() *Attributes
2929
}
3030

@@ -37,14 +37,14 @@ type EndpointMetricsState interface {
3737
// Endpoint represents an inference serving endpoint and its related attributes.
3838
type Endpoint interface {
3939
fmt.Stringer
40-
EndpointPodState
40+
EndpointMetaState
4141
EndpointMetricsState
4242
AttributeMap
4343
}
4444

4545
// ModelServer is an implementation of the Endpoint interface.
4646
type ModelServer struct {
47-
pod atomic.Pointer[PodInfo]
47+
pod atomic.Pointer[EndpointMetadata]
4848
metrics atomic.Pointer[Metrics]
4949
attributes *Attributes
5050
}
@@ -68,14 +68,14 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer {
6868
// String returns a representation of the ModelServer. For brevity, only names of
6969
// extended attributes are returned and not their values.
7070
func (srv *ModelServer) String() string {
71-
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys())
71+
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys())
7272
}
7373

74-
func (srv *ModelServer) GetPod() *PodInfo {
74+
func (srv *ModelServer) GetMetadata() *EndpointMetadata {
7575
return srv.pod.Load()
7676
}
7777

78-
func (srv *ModelServer) UpdatePod(pod *PodInfo) {
78+
func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) {
7979
srv.pod.Store(pod)
8080
}
8181

pkg/epp/datalayer/factory.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type PoolInfo interface {
4646
// providing methods to allocate and retire endpoints. This can potentially be used for
4747
// pooled memory or other management chores in the implementation.
4848
type EndpointFactory interface {
49-
NewEndpoint(parent context.Context, inpod *PodInfo, poolinfo PoolInfo) Endpoint
49+
NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, poolinfo PoolInfo) Endpoint
5050
ReleaseEndpoint(ep Endpoint)
5151
}
5252

@@ -71,16 +71,16 @@ func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Durati
7171
// NewEndpoint implements EndpointFactory.NewEndpoint.
7272
// Creates a new endpoint and starts its associated collector with its own ticker.
7373
// Guards against multiple concurrent calls for the same endpoint.
74-
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, _ PoolInfo) Endpoint {
75-
key := types.NamespacedName{Namespace: inpod.GetNamespacedName().Namespace, Name: inpod.GetNamespacedName().Name}
74+
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inEndpointMetadata *EndpointMetadata, _ PoolInfo) Endpoint {
75+
key := types.NamespacedName{Namespace: inEndpointMetadata.GetNamespacedName().Namespace, Name: inEndpointMetadata.GetNamespacedName().Name}
7676
logger := log.FromContext(parent).WithValues("pod", key)
7777

7878
if _, ok := lc.collectors.Load(key); ok {
7979
logger.Info("collector already running for endpoint", "endpoint", key)
8080
return nil
8181
}
8282

83-
endpoint := NewEndpoint(inpod, nil)
83+
endpoint := NewEndpoint(inEndpointMetadata, nil)
8484
collector := NewCollector() // TODO or full backward compatibility, set the logger and poolinfo
8585

8686
if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded {
@@ -102,7 +102,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo,
102102
// ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint
103103
// Stops the collector and cleans up resources for the endpoint
104104
func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) {
105-
key := ep.GetPod().GetNamespacedName()
105+
key := ep.GetMetadata().GetNamespacedName()
106106

107107
if value, ok := lc.collectors.LoadAndDelete(key); ok {
108108
collector := value.(*Collector)

pkg/epp/datalayer/metrics/datasource.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
9898
// Collect is triggered by the data layer framework to fetch potentially new
9999
// MSP metrics data for an endpoint.
100100
func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
101-
target := dataSrc.getMetricsEndpoint(ep.GetPod())
102-
families, err := dataSrc.client.Get(ctx, target, ep.GetPod())
101+
target := dataSrc.getMetricsEndpoint(ep.GetMetadata())
102+
families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata())
103103

104104
if err != nil {
105105
return err

pkg/epp/datastore/datastore.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,13 @@ func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective {
207207
// /// Pods/endpoints APIs ///
208208
// TODO: add a flag for callers to specify the staleness threshold for metrics.
209209
// ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694
210-
func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics {
211-
res := []backendmetrics.PodMetrics{}
210+
func (ds *datastore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint {
211+
res := []datalayer.Endpoint{}
212212

213213
ds.pods.Range(func(k, v any) bool {
214-
pm := v.(backendmetrics.PodMetrics)
215-
if predicate(pm) {
216-
res = append(res, pm)
214+
ep := v.(datalayer.Endpoint)
215+
if predicate(ep) {
216+
res = append(res, ep)
217217
}
218218
return true
219219
})
@@ -235,14 +235,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
235235
if len(ds.pool.TargetPorts) == 1 {
236236
modelServerMetricsPort = int(ds.modelServerMetricsPort)
237237
}
238-
pods := []*datalayer.PodInfo{}
238+
pods := []*datalayer.EndpointMetadata{}
239239
for idx, port := range ds.pool.TargetPorts {
240240
metricsPort := modelServerMetricsPort
241241
if metricsPort == 0 {
242242
metricsPort = port
243243
}
244244
pods = append(pods,
245-
&datalayer.PodInfo{
245+
&datalayer.EndpointMetadata{
246246
NamespacedName: types.NamespacedName{
247247
Name: pod.Name + "-rank-" + strconv.Itoa(idx),
248248
Namespace: pod.Namespace,
@@ -256,28 +256,28 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
256256
}
257257

258258
result := true
259-
for _, podInfo := range pods {
260-
var pm backendmetrics.PodMetrics
261-
existing, ok := ds.pods.Load(podInfo.NamespacedName)
259+
for _, endpointMetadata := range pods {
260+
var ep datalayer.Endpoint
261+
existing, ok := ds.pods.Load(endpointMetadata.NamespacedName)
262262
if !ok {
263-
pm = ds.epf.NewEndpoint(ds.parentCtx, podInfo, ds)
264-
ds.pods.Store(podInfo.NamespacedName, pm)
263+
ep = ds.epf.NewEndpoint(ds.parentCtx, endpointMetadata, ds)
264+
ds.pods.Store(endpointMetadata.NamespacedName, ep)
265265
result = false
266266
} else {
267-
pm = existing.(backendmetrics.PodMetrics)
267+
ep = existing.(backendmetrics.PodMetrics)
268268
}
269-
// Update pod properties if anything changed.
270-
pm.UpdatePod(podInfo)
269+
// Update endpoint properties if anything changed.
270+
ep.UpdateMetadata(endpointMetadata)
271271
}
272272
return result
273273
}
274274

275275
func (ds *datastore) PodDelete(podName string) {
276276
ds.pods.Range(func(k, v any) bool {
277-
pm := v.(backendmetrics.PodMetrics)
278-
if pm.GetPod().PodName == podName {
277+
ep := v.(datalayer.Endpoint)
278+
if ep.GetMetadata().PodName == podName {
279279
ds.pods.Delete(k)
280-
ds.epf.ReleaseEndpoint(pm)
280+
ds.epf.ReleaseEndpoint(ep)
281281
}
282282
return true
283283
})
@@ -309,10 +309,10 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err
309309

310310
// Remove pods that don't belong to the pool or not ready any more.
311311
ds.pods.Range(func(k, v any) bool {
312-
pm := v.(backendmetrics.PodMetrics)
313-
if exist := activePods[pm.GetPod().PodName]; !exist {
314-
logger.V(logutil.VERBOSE).Info("Removing pod", "pod", pm.GetPod())
315-
ds.PodDelete(pm.GetPod().PodName)
312+
ep := v.(datalayer.Endpoint)
313+
if exist := activePods[ep.GetMetadata().PodName]; !exist {
314+
logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata())
315+
ds.PodDelete(ep.GetMetadata().PodName)
316316
}
317317
return true
318318
})

pkg/epp/metrics/collectors/inference_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
7474
prometheus.GaugeValue,
7575
float64(pod.GetMetrics().WaitingQueueSize),
7676
pool.Name,
77-
pod.GetPod().NamespacedName.Name,
77+
pod.GetMetadata().NamespacedName.Name,
7878
)
7979
}
8080
}

pkg/epp/requestcontrol/director.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet
233233
podTotalCount := 0
234234
podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool {
235235
podTotalCount++
236-
if _, found := endpoints[pm.GetPod().GetIPAddress()]; found {
236+
if _, found := endpoints[pm.GetMetadata().GetIPAddress()]; found {
237237
return true
238238
}
239239
return false
@@ -277,9 +277,9 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch
277277
pm := make([]schedulingtypes.Pod, len(pods))
278278
for i, pod := range pods {
279279
if pod.GetAttributes() != nil {
280-
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()}
280+
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()}
281281
} else {
282-
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()}
282+
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()}
283283
}
284284
}
285285

@@ -336,7 +336,7 @@ func (d *Director) GetRandomPod() *backend.Pod {
336336
}
337337
number := rand.Intn(len(pods))
338338
pod := pods[number]
339-
return pod.GetPod()
339+
return pod.GetMetadata()
340340
}
341341

342342
func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest,

0 commit comments

Comments
 (0)