diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 5ce4d8983..7bc3d85ad 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -447,6 +447,7 @@ func (r *Runner) registerInTreePlugins() { plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory) plugins.Register(scorer.KvCacheUtilizationScorerType, scorer.KvCacheUtilizationScorerFactory) plugins.Register(scorer.QueueScorerType, scorer.QueueScorerFactory) + plugins.Register(scorer.RunningRequestsSizeScorerType, scorer.RunningRequestsSizeScorerFactory) plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory) // Latency predictor plugins plugins.Register(slo_aware_router.SLOAwareRouterPluginType, slo_aware_router.SLOAwareRouterFactory) diff --git a/pkg/epp/backend/metrics/metrics.go b/pkg/epp/backend/metrics/metrics.go index d2d9b60f9..4c7bb13a4 100644 --- a/pkg/epp/backend/metrics/metrics.go +++ b/pkg/epp/backend/metrics/metrics.go @@ -100,7 +100,7 @@ func (p *PodMetricsClientImpl) promToPodMetrics( if p.MetricMapping.TotalRunningRequests != nil { running, err := p.getMetric(metricFamilies, *p.MetricMapping.TotalRunningRequests) if err == nil { - updated.RunningQueueSize = int(running.GetGauge().GetValue()) + updated.RunningRequestsSize = int(running.GetGauge().GetValue()) } else { errs = multierr.Append(errs, err) } diff --git a/pkg/epp/datalayer/metrics.go b/pkg/epp/datalayer/metrics.go index 7deecb9a3..9faefc32e 100644 --- a/pkg/epp/datalayer/metrics.go +++ b/pkg/epp/datalayer/metrics.go @@ -28,7 +28,7 @@ type Metrics struct { WaitingModels map[string]int // MaxActiveModels is the maximum number of models that can be loaded to GPU. MaxActiveModels int - RunningQueueSize int + RunningRequestsSize int WaitingQueueSize int KVCacheUsagePercent float64 KvCacheMaxTokenCapacity int @@ -74,7 +74,7 @@ func (m *Metrics) Clone() *Metrics { ActiveModels: activeModels, WaitingModels: waitingModels, MaxActiveModels: m.MaxActiveModels, - RunningQueueSize: m.RunningQueueSize, + RunningRequestsSize: m.RunningRequestsSize, WaitingQueueSize: m.WaitingQueueSize, KVCacheUsagePercent: m.KVCacheUsagePercent, KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity, diff --git a/pkg/epp/datalayer/metrics/extractor.go b/pkg/epp/datalayer/metrics/extractor.go index e296cd3bc..c92d8bf64 100644 --- a/pkg/epp/datalayer/metrics/extractor.go +++ b/pkg/epp/datalayer/metrics/extractor.go @@ -56,6 +56,7 @@ type Extractor struct { func Produces() map[string]any { return map[string]any{ metrics.WaitingQueueSizeKey: int(0), + metrics.RunningRequestsSizeKey: int(0), metrics.KVCacheUsagePercentKey: float64(0), metrics.ActiveModelsKey: map[string]int{}, metrics.WaitingModelsKey: map[string]int{}, @@ -119,7 +120,7 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi if metric, err := spec.getLatestMetric(families); err != nil { errs = append(errs, err) } else { - clone.RunningQueueSize = int(extractValue(metric)) + clone.RunningRequestsSize = int(extractValue(metric)) updated = true } } diff --git a/pkg/epp/datalayer/metrics/logger_test.go b/pkg/epp/datalayer/metrics/logger_test.go index 0c0a20761..7c11c10af 100644 --- a/pkg/epp/datalayer/metrics/logger_test.go +++ b/pkg/epp/datalayer/metrics/logger_test.go @@ -74,7 +74,7 @@ func TestLogger(t *testing.T) { assert.Contains(t, logOutput, "Refreshing Prometheus Metrics {\"ReadyPods\": 2}") assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Metadata: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678") assert.Contains(t, logOutput, "Metrics: {ActiveModels:map[modelA:1] WaitingModels:map[modelB:2] MaxActiveModels:5") - assert.Contains(t, logOutput, "RunningQueueSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048") + assert.Contains(t, logOutput, "RunningRequestsSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048") assert.Contains(t, logOutput, "Metadata: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679") assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"") } @@ -106,7 +106,7 @@ func (f *fakeDataStore) PodList(predicate func(datalayer.Endpoint) bool) []datal ActiveModels: map[string]int{"modelA": 1}, WaitingModels: map[string]int{"modelB": 2}, MaxActiveModels: 5, - RunningQueueSize: 3, + RunningRequestsSize: 3, WaitingQueueSize: 7, KVCacheUsagePercent: 42.5, KvCacheMaxTokenCapacity: 2048, diff --git a/pkg/epp/datalayer/metrics_test.go b/pkg/epp/datalayer/metrics_test.go index dbdf3df1b..be7439d11 100644 --- a/pkg/epp/datalayer/metrics_test.go +++ b/pkg/epp/datalayer/metrics_test.go @@ -29,7 +29,7 @@ func TestMetricsClone(t *testing.T) { ActiveModels: map[string]int{"modelA": 1}, WaitingModels: map[string]int{"modelB": 2}, MaxActiveModels: 5, - RunningQueueSize: 3, + RunningRequestsSize: 3, WaitingQueueSize: 7, KVCacheUsagePercent: 42.5, KvCacheMaxTokenCapacity: 2048, diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index c3f472070..fdb806f93 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -37,6 +37,7 @@ const ( KVCacheUsagePercentKey = "KVCacheUsagePercent" WaitingQueueSizeKey = "WaitingQueueSize" + RunningRequestsSizeKey = "RunningRequestsSize" MaxActiveModelsKey = "MaxActiveModels" ActiveModelsKey = "ActiveModels" WaitingModelsKey = "WaitingModels" diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go index 7482de93c..03d2ce59a 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/latencypredictor_helper.go @@ -87,7 +87,7 @@ func processHeaderForLatencyPrediction( KVCachePercentage: m.KVCacheUsagePercent, InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)), NumRequestWaiting: m.WaitingQueueSize, - NumRequestRunning: m.RunningQueueSize, + NumRequestRunning: m.RunningRequestsSize, NumTokensGenerated: 0, PrefixCacheScore: prefix_cache_score, } @@ -174,7 +174,7 @@ func recordTTFTTrainingData( ActualTPOT: 0, Timestamp: now, NumRequestWaiting: m.WaitingQueueSize, - NumRequestRunning: m.RunningQueueSize, + NumRequestRunning: m.RunningRequestsSize, NumTokensGenerated: 0, PrefixCacheScore: prefixCacheScore, } @@ -201,7 +201,7 @@ func predictFirstTPOT( KVCachePercentage: m.KVCacheUsagePercent, InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)), NumRequestWaiting: m.WaitingQueueSize, - NumRequestRunning: m.RunningQueueSize, + NumRequestRunning: m.RunningRequestsSize, NumTokensGenerated: sloCtx.generatedTokenCount, PrefixCacheScore: 0, } @@ -260,7 +260,7 @@ func processTokenForLatencyPrediction( ActualTPOT: latencyMs, Timestamp: now, NumRequestWaiting: m.WaitingQueueSize, - NumRequestRunning: m.RunningQueueSize, + NumRequestRunning: m.RunningRequestsSize, NumTokensGenerated: sloCtx.generatedTokenCount - 1, PrefixCacheScore: 0, // TPOT does not use prefix cache score } @@ -274,7 +274,7 @@ func processTokenForLatencyPrediction( KVCachePercentage: m.KVCacheUsagePercent, InputTokenLength: len(strings.Fields(sloCtx.schedulingRequest.Body.Completions.Prompt)), NumRequestWaiting: m.WaitingQueueSize, - NumRequestRunning: m.RunningQueueSize, + NumRequestRunning: m.RunningRequestsSize, NumTokensGenerated: sloCtx.generatedTokenCount, PrefixCacheScore: 0, // TPOT does not use prefix cache score } @@ -337,7 +337,7 @@ func bulkPredictWithMetrics( KVCachePercentage: metricsStates[i].KVCacheUsagePercent, InputTokenLength: len(strings.Fields(prompts[i])), NumRequestWaiting: metricsStates[i].WaitingQueueSize, - NumRequestRunning: metricsStates[i].RunningQueueSize, + NumRequestRunning: metricsStates[i].RunningRequestsSize, NumTokensGenerated: generatedTokenCounts[i], PrefixCacheScore: prefixCacheScores[i], } @@ -385,7 +385,7 @@ func bulkPredictWithMetrics( "generated_tokens", bulkRequests[i].NumTokensGenerated, "kv_cache_percent", bulkRequests[i].KVCachePercentage, "waiting_queue", bulkRequests[i].NumRequestWaiting, - "running_queue", bulkRequests[i].NumRequestRunning, + "running_requests", bulkRequests[i].NumRequestRunning, "prefix_cache_score", bulkRequests[i].PrefixCacheScore) } } diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/requestcontrol_hooks_test.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/requestcontrol_hooks_test.go index 5aaf1a2a2..ac7344e30 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/requestcontrol_hooks_test.go +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/requestcontrol_hooks_test.go @@ -36,17 +36,17 @@ import ( ) const ( - testModelName = "test-model" - kvUsage = 1 - runningQueue = 1 - waitingQueue = 1 + testModelName = "test-model" + kvUsage = 1 + runningRequests = 1 + waitingQueue = 1 ) // Helper functions func createTestSchedulingResult(pod *backend.Pod) *schedulingtypes.SchedulingResult { - mockPod := createTestPod(pod.NamespacedName.Name, kvUsage, runningQueue, waitingQueue) + mockPod := createTestPod(pod.NamespacedName.Name, kvUsage, runningRequests, waitingQueue) return &schedulingtypes.SchedulingResult{ PrimaryProfileName: "default", @@ -343,12 +343,12 @@ func TestSLOAwareRouter_ResponseStreaming_FirstToken(t *testing.T) { sloCtx.lastSeenMetrics["prefill"] = &backendmetrics.MetricsState{ KVCacheUsagePercent: 0.5, WaitingQueueSize: 1, - RunningQueueSize: 1, + RunningRequestsSize: 1, } sloCtx.lastSeenMetrics["default"] = &backendmetrics.MetricsState{ KVCacheUsagePercent: 0.5, WaitingQueueSize: 1, - RunningQueueSize: 1, + RunningRequestsSize: 1, } router.setSLOContextForRequest(request, sloCtx) @@ -394,12 +394,12 @@ func TestSLOAwareRouter_ResponseStreaming_SubsequentTokens(t *testing.T) { sloCtx.lastSeenMetrics["prefill"] = &backendmetrics.MetricsState{ KVCacheUsagePercent: 0.5, WaitingQueueSize: 1, - RunningQueueSize: 1, + RunningRequestsSize: 1, } sloCtx.lastSeenMetrics["default"] = &backendmetrics.MetricsState{ KVCacheUsagePercent: 0.5, WaitingQueueSize: 1, - RunningQueueSize: 1, + RunningRequestsSize: 1, } firstTokenTime := time.Now().Add(-100 * time.Millisecond) diff --git a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go index f34fe38be..8d8f68393 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go +++ b/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/scorer_test.go @@ -103,7 +103,7 @@ func (m *mockPredictor) GetServerStatus(ctx context.Context) (*latencypredictor. return &latencypredictor.ServerStatusResponse{}, nil } -func createTestPod(name string, kvCacheUsage float64, runningQueueSize, waitingQueueSize int) schedulingtypes.Pod { +func createTestPod(name string, kvCacheUsage float64, runningRequestsSize, waitingQueueSize int) schedulingtypes.Pod { return &schedulingtypes.PodMetrics{ Pod: &backend.Pod{ NamespacedName: types.NamespacedName{ @@ -113,7 +113,7 @@ func createTestPod(name string, kvCacheUsage float64, runningQueueSize, waitingQ }, MetricsState: &backendmetrics.MetricsState{ KVCacheUsagePercent: kvCacheUsage, - RunningQueueSize: runningQueueSize, + RunningRequestsSize: runningRequestsSize, WaitingQueueSize: waitingQueueSize, }, } diff --git a/pkg/epp/scheduling/framework/plugins/scorer/running.go b/pkg/epp/scheduling/framework/plugins/scorer/running.go new file mode 100644 index 000000000..c446ac0ca --- /dev/null +++ b/pkg/epp/scheduling/framework/plugins/scorer/running.go @@ -0,0 +1,104 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scorer + +import ( + "context" + "encoding/json" + "math" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +const ( + RunningRequestsSizeScorerType = "running-requests-size-scorer" +) + +// compile-time type assertion +var _ framework.Scorer = &RunningRequestsSizeScorer{} + +// RunningRequestsSizeScorerFactory defines the factory function for RunningRequestsSizeScorer. +func RunningRequestsSizeScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewRunningRequestsSizeScorer().WithName(name), nil +} + +// NewRunningRequestsSizeScorer initializes a new RunningRequestsSizeScorer and returns its pointer. +func NewRunningRequestsSizeScorer() *RunningRequestsSizeScorer { + return &RunningRequestsSizeScorer{ + typedName: plugins.TypedName{Type: RunningRequestsSizeScorerType, Name: RunningRequestsSizeScorerType}, + } +} + +// RunningRequestsSizeScorer scores list of candidate pods based on the pod's running request size. +// the less running request size the pod has, the higher score it will get (since it's more available to serve new request). +type RunningRequestsSizeScorer struct { + typedName plugins.TypedName +} + +// TypedName returns the type and name tuple of this plugin instance. +func (s *RunningRequestsSizeScorer) TypedName() plugins.TypedName { + return s.typedName +} + +// Consumes returns the list of data that is consumed by the plugin. +func (s *RunningRequestsSizeScorer) Consumes() map[string]any { + return map[string]any{ + metrics.RunningRequestsSizeKey: int(0), + } +} + +// WithName sets the name of the scorer. +func (s *RunningRequestsSizeScorer) WithName(name string) *RunningRequestsSizeScorer { + s.typedName.Name = name + return s +} + +// Score returns the scoring result for the given list of pods based on context. +func (s *RunningRequestsSizeScorer) Score(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 { + minQueueSize := math.MaxInt + maxQueueSize := math.MinInt + + // Iterate through the remaining pods to find min and max + for _, pod := range pods { + queueSize := pod.GetMetrics().RunningRequestsSize + if queueSize < minQueueSize { + minQueueSize = queueSize + } + if queueSize > maxQueueSize { + maxQueueSize = queueSize + } + } + + // podScoreFunc calculates the score based on the queue size of each pod. Longer queue gets a lower score. + podScoreFunc := func(pod types.Pod) float64 { + if maxQueueSize == minQueueSize { + // If all pods have the same queue size, return a neutral score + return 1.0 + } + return float64(maxQueueSize-pod.GetMetrics().RunningRequestsSize) / float64(maxQueueSize-minQueueSize) + } + + // Create a map to hold the scores for each pod + scores := make(map[types.Pod]float64, len(pods)) + for _, pod := range pods { + scores[pod] = podScoreFunc(pod) + } + return scores +} diff --git a/pkg/epp/scheduling/framework/plugins/scorer/running_test.go b/pkg/epp/scheduling/framework/plugins/scorer/running_test.go new file mode 100644 index 000000000..76864d480 --- /dev/null +++ b/pkg/epp/scheduling/framework/plugins/scorer/running_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scorer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" +) + +func TestRunningRequestsSizeScorer(t *testing.T) { + tests := []struct { + name string + pods []types.Pod + expectedScoresPod map[int]float64 // Map of pod index to expected score + }{ + { + name: "Different running queue sizes", + pods: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningRequestsSize: 10}}, + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningRequestsSize: 5}}, + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningRequestsSize: 0}}, + }, + expectedScoresPod: map[int]float64{ + 0: 0.0, // Longest queue (10) gets lowest score + 1: 0.5, // Medium queue (5) gets medium score + 2: 1.0, // Shortest queue (0) gets highest score + }, + }, + { + name: "Same running queue sizes", + pods: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningRequestsSize: 5}}, + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningRequestsSize: 5}}, + }, + expectedScoresPod: map[int]float64{ + 0: 1.0, // When all pods have the same queue size, they get the same neutral score + 1: 1.0, + }, + }, + { + name: "Zero running queue sizes", + pods: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningRequestsSize: 0}}, + &types.PodMetrics{Pod: &backend.Pod{}, MetricsState: &backendmetrics.MetricsState{RunningRequestsSize: 0}}, + }, + expectedScoresPod: map[int]float64{ + 0: 1.0, + 1: 1.0, + }, + }, + } + + scorer := &RunningRequestsSizeScorer{} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + scores := scorer.Score(context.Background(), types.NewCycleState(), &types.LLMRequest{}, test.pods) + + for i, pod := range test.pods { + expectedScore := test.expectedScoresPod[i] + assert.InDelta(t, expectedScore, scores[pod], 0.0001, "Pod %d should have score %f", i, expectedScore) + } + }) + } +}