From 6b1988a8b8ffffb5c375b84c82a02670ae45681c Mon Sep 17 00:00:00 2001 From: CYJiang Date: Tue, 2 Dec 2025 15:21:49 +0800 Subject: [PATCH] feat(metrics): add scheduler attempt counter and outcome helper Signed-off-by: CYJiang --- pkg/epp/metrics/metrics.go | 28 +++++++++++- pkg/epp/metrics/metrics_test.go | 45 +++++++++++++++++++ .../testdata/scheduler_attempts_total_metrics | 4 ++ pkg/epp/scheduling/scheduler.go | 8 ++-- 4 files changed, 81 insertions(+), 4 deletions(-) create mode 100644 pkg/epp/metrics/testdata/scheduler_attempts_total_metrics diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index 44c3be87d..c3f472070 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -299,6 +299,16 @@ var ( []string{}, ) + // SchedulerAttemptsTotal counts total number of scheduling attempts, labeled by status. + SchedulerAttemptsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: InferenceExtension, + Name: "scheduler_attempts_total", + Help: metricsutil.HelpMsgWithStability("Total number of scheduling attempts.", compbasemetrics.ALPHA), + }, + []string{"status"}, // "success", "failure" + ) + PluginProcessingLatencies = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: InferenceExtension, @@ -419,6 +429,7 @@ func Register(customCollectors ...prometheus.Collector) { metrics.Registry.MustRegister(inferencePoolAvgQueueSize) metrics.Registry.MustRegister(inferencePoolReadyPods) metrics.Registry.MustRegister(SchedulerE2ELatency) + metrics.Registry.MustRegister(SchedulerAttemptsTotal) metrics.Registry.MustRegister(PluginProcessingLatencies) metrics.Registry.MustRegister(InferenceExtensionInfo) metrics.Registry.MustRegister(PrefixCacheSize) @@ -464,6 +475,7 @@ func Reset() { inferencePoolAvgQueueSize.Reset() inferencePoolReadyPods.Reset() SchedulerE2ELatency.Reset() + SchedulerAttemptsTotal.Reset() PluginProcessingLatencies.Reset() InferenceExtensionInfo.Reset() PrefixCacheSize.Reset() @@ -474,7 +486,7 @@ func Reset() { inferenceModelRewriteDecisionsTotal.Reset() } -// RecordRequstCounter records the number of requests. +// RecordRequestCounter records the number of requests. func RecordRequestCounter(modelName, targetModelName string) { requestCounter.WithLabelValues(modelName, targetModelName).Inc() } @@ -696,6 +708,20 @@ func RecordSchedulerE2ELatency(duration time.Duration) { SchedulerE2ELatency.WithLabelValues().Observe(duration.Seconds()) } +// RecordSchedulerAttempt records a scheduling attempt with status. +func RecordSchedulerAttempt(err error) { + if err != nil { + SchedulerAttemptsTotal.WithLabelValues(SchedulerStatusFailure).Inc() + } else { + SchedulerAttemptsTotal.WithLabelValues(SchedulerStatusSuccess).Inc() + } +} + +const ( + SchedulerStatusSuccess = "success" + SchedulerStatusFailure = "failure" +) + // RecordPluginProcessingLatency records the processing latency for a plugin. func RecordPluginProcessingLatency(extensionPoint, pluginType, pluginName string, duration time.Duration) { PluginProcessingLatencies.WithLabelValues(extensionPoint, pluginType, pluginName).Observe(duration.Seconds()) diff --git a/pkg/epp/metrics/metrics_test.go b/pkg/epp/metrics/metrics_test.go index a39ade84e..5dac3731e 100644 --- a/pkg/epp/metrics/metrics_test.go +++ b/pkg/epp/metrics/metrics_test.go @@ -18,6 +18,7 @@ package metrics import ( "context" + "errors" "os" "testing" "time" @@ -684,6 +685,50 @@ func TestSchedulerE2ELatency(t *testing.T) { } } +func TestSchedulerAttemptsTotal(t *testing.T) { + + scenarios := []struct { + name string + successCount int + failureCount int + }{ + { + name: "mixed success and failure attempts", + successCount: 10, + failureCount: 5, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + Reset() + for i := 0; i < scenario.successCount; i++ { + RecordSchedulerAttempt(nil) + } + for i := 0; i < scenario.failureCount; i++ { + RecordSchedulerAttempt(errors.New("simulated scheduling failure")) + } + + wantMetrics, err := os.Open("testdata/scheduler_attempts_total_metrics") + defer func() { + if err = wantMetrics.Close(); err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + if err := testutil.GatherAndCompare( + metrics.Registry, + wantMetrics, + "inference_extension_scheduler_attempts_total", + ); err != nil { + t.Errorf("metric comparison failed: %v", err) + } + }) + } +} + func TestPrefixCacheMetrics(t *testing.T) { Reset() const ( diff --git a/pkg/epp/metrics/testdata/scheduler_attempts_total_metrics b/pkg/epp/metrics/testdata/scheduler_attempts_total_metrics new file mode 100644 index 000000000..655654964 --- /dev/null +++ b/pkg/epp/metrics/testdata/scheduler_attempts_total_metrics @@ -0,0 +1,4 @@ +# HELP inference_extension_scheduler_attempts_total [ALPHA] Total number of scheduling attempts. +# TYPE inference_extension_scheduler_attempts_total counter +inference_extension_scheduler_attempts_total{status="failure"} 5 +inference_extension_scheduler_attempts_total{status="success"} 10 diff --git a/pkg/epp/scheduling/scheduler.go b/pkg/epp/scheduling/scheduler.go index d7bcb1f27..5b2e64d23 100644 --- a/pkg/epp/scheduling/scheduler.go +++ b/pkg/epp/scheduling/scheduler.go @@ -44,12 +44,13 @@ type Scheduler struct { } // Schedule finds the target pod based on metrics and the requested lora adapter. -func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, candidatePods []types.Pod) (*types.SchedulingResult, error) { +func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, candidatePods []types.Pod) (result *types.SchedulingResult, err error) { loggerVerbose := log.FromContext(ctx).V(logutil.VERBOSE) scheduleStart := time.Now() defer func() { metrics.RecordSchedulerE2ELatency(time.Since(scheduleStart)) + metrics.RecordSchedulerAttempt(err) }() profileRunResults := map[string]*types.ProfileRunResult{} @@ -80,12 +81,13 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, can } if len(profileRunResults) == 0 { - return nil, fmt.Errorf("failed to run any scheduler profile for request %s", request.RequestId) + err = fmt.Errorf("failed to run any scheduler profile for request %s", request.RequestId) + return nil, err } loggerVerbose.Info("Running profile handler, ProcessResults", "plugin", s.profileHandler.TypedName()) before := time.Now() - result, err := s.profileHandler.ProcessResults(ctx, cycleState, request, profileRunResults) + result, err = s.profileHandler.ProcessResults(ctx, cycleState, request, profileRunResults) metrics.RecordPluginProcessingLatency(framework.ProcessProfilesResultsExtensionPoint, s.profileHandler.TypedName().Type, s.profileHandler.TypedName().Name, time.Since(before)) loggerVerbose.Info("Completed running profile handler ProcessResults successfully", "plugin", s.profileHandler.TypedName())