generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 206
refactor: Standardize config loading and system default injection #1953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
LukeAVanDrie
wants to merge
3
commits into
kubernetes-sigs:main
Choose a base branch
from
LukeAVanDrie:refactor/config-loader
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+887
−1,084
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,171 +35,206 @@ import ( | |
| "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile" | ||
| ) | ||
|
|
||
| var scheme = runtime.NewScheme() | ||
|
|
||
| var registeredFeatureGates = sets.New[string]() // set of feature gates names, a name must be unique | ||
| var ( | ||
| scheme = runtime.NewScheme() | ||
| registeredFeatureGates = sets.New[string]() | ||
| ) | ||
|
|
||
| func init() { | ||
| utilruntime.Must(configapi.Install(scheme)) | ||
| } | ||
|
|
||
| // LoadConfigPhaseOne first phase of loading configuration from supplied text that was converted to []byte | ||
| func LoadConfigPhaseOne(configBytes []byte, logger logr.Logger) (*configapi.EndpointPickerConfig, map[string]bool, error) { | ||
| rawConfig, err := loadRawConfig(configBytes) | ||
| // RegisterFeatureGate registers a feature gate name for validation purposes. | ||
| func RegisterFeatureGate(gate string) { | ||
| registeredFeatureGates.Insert(gate) | ||
| } | ||
|
|
||
| // LoadRawConfig parses the raw configuration bytes, applies initial defaults, and extracts feature gates. | ||
| // It does not instantiate plugins. | ||
| func LoadRawConfig(configBytes []byte, logger logr.Logger) (*configapi.EndpointPickerConfig, map[string]bool, error) { | ||
| // Decode JSON/YAML. | ||
| rawConfig, err := decodeRawConfig(configBytes) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
| logger.V(1).Info("Loaded raw configuration", "config", rawConfig) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we use logger utils consts instead of numbers? |
||
|
|
||
| logger.Info("Loaded configuration", "config", rawConfig) | ||
| // Sanitize data. | ||
| applyStaticDefaults(rawConfig) | ||
|
|
||
| if err = validateFeatureGates(rawConfig.FeatureGates); err != nil { | ||
| return nil, nil, fmt.Errorf("failed to validate feature gates - %w", err) | ||
| // Early validation of Feature Gates. | ||
| // We validate gates early because they might dictate downstream loading logic. | ||
| if err := validateFeatureGates(rawConfig.FeatureGates); err != nil { | ||
| return nil, nil, fmt.Errorf("feature gate validation failed: %w", err) | ||
| } | ||
|
|
||
| setDefaultsPhaseOne(rawConfig) | ||
|
|
||
| featureConfig := loadFeatureConfig(rawConfig.FeatureGates) | ||
|
|
||
| return rawConfig, featureConfig, nil | ||
| } | ||
|
|
||
| // LoadConfigPhaseOne first phase of loading configuration from supplied text that was converted to []byte | ||
| func LoadConfigPhaseTwo(rawConfig *configapi.EndpointPickerConfig, handle plugins.Handle, logger logr.Logger) (*config.Config, error) { | ||
| var err error | ||
| // instantiate loaded plugins | ||
| if err = instantiatePlugins(rawConfig.Plugins, handle); err != nil { | ||
| return nil, fmt.Errorf("failed to instantiate plugins - %w", err) | ||
| } | ||
|
|
||
| setDefaultsPhaseTwo(rawConfig, handle) | ||
| // InstantiateAndConfigure performs the heavy lifting of plugin instantiation, system architecture injection, and | ||
| // scheduler construction. | ||
| func InstantiateAndConfigure( | ||
| rawConfig *configapi.EndpointPickerConfig, | ||
| handle plugins.Handle, | ||
| logger logr.Logger, | ||
| ) (*config.Config, error) { | ||
|
|
||
| logger.Info("Configuration with defaults set", "config", rawConfig) | ||
| // Instantiate user-configured plugins. | ||
| if err := instantiatePlugins(rawConfig.Plugins, handle); err != nil { | ||
| return nil, fmt.Errorf("plugin instantiation failed: %w", err) | ||
| } | ||
|
|
||
| if err = validateSchedulingProfiles(rawConfig); err != nil { | ||
| return nil, fmt.Errorf("failed to validate scheduling profiles - %w", err) | ||
| // Fill in the architectural gaps (inject required system plugins). | ||
| if err := applySystemDefaults(rawConfig, handle); err != nil { | ||
| return nil, fmt.Errorf("system default application failed: %w", err) | ||
| } | ||
| logger.Info("Effective configuration loaded", "config", rawConfig) | ||
|
|
||
| config := &config.Config{} | ||
| // Deep validation checks relationships between now-finalized profiles and plugins. | ||
| if err := validateConfig(rawConfig); err != nil { | ||
| return nil, fmt.Errorf("configuration validation failed: %w", err) | ||
| } | ||
|
|
||
| config.SchedulerConfig, err = loadSchedulerConfig(rawConfig.SchedulingProfiles, handle) | ||
| // Build scheduler config. | ||
| schedulerConfig, err := buildSchedulerConfig(rawConfig.SchedulingProfiles, handle) | ||
| if err != nil { | ||
| return nil, err | ||
| return nil, fmt.Errorf("scheduler config build failed: %w", err) | ||
| } | ||
| config.SaturationDetectorConfig = loadSaturationDetectorConfig(rawConfig.SaturationDetector) | ||
|
|
||
| return config, nil | ||
| return &config.Config{ | ||
| SchedulerConfig: schedulerConfig, | ||
| SaturationDetectorConfig: buildSaturationConfig(rawConfig.SaturationDetector), | ||
| }, nil | ||
| } | ||
|
|
||
| func loadRawConfig(configBytes []byte) (*configapi.EndpointPickerConfig, error) { | ||
| rawConfig := &configapi.EndpointPickerConfig{} | ||
|
|
||
| func decodeRawConfig(configBytes []byte) (*configapi.EndpointPickerConfig, error) { | ||
| cfg := &configapi.EndpointPickerConfig{} | ||
| codecs := serializer.NewCodecFactory(scheme, serializer.EnableStrict) | ||
| err := runtime.DecodeInto(codecs.UniversalDecoder(), configBytes, rawConfig) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("the configuration is invalid - %w", err) | ||
| if err := runtime.DecodeInto(codecs.UniversalDecoder(), configBytes, cfg); err != nil { | ||
| return nil, fmt.Errorf("failed to decode configuration JSON/YAML: %w", err) | ||
| } | ||
| return rawConfig, nil | ||
| return cfg, nil | ||
| } | ||
|
|
||
| func loadSchedulerConfig(configProfiles []configapi.SchedulingProfile, handle plugins.Handle) (*scheduling.SchedulerConfig, error) { | ||
| profiles := map[string]*framework.SchedulerProfile{} | ||
| for _, namedProfile := range configProfiles { | ||
| profile := framework.NewSchedulerProfile() | ||
| for _, plugin := range namedProfile.Plugins { | ||
| referencedPlugin := handle.Plugin(plugin.PluginRef) | ||
| if scorer, ok := referencedPlugin.(framework.Scorer); ok { | ||
| referencedPlugin = framework.NewWeightedScorer(scorer, *plugin.Weight) | ||
| func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins.Handle) error { | ||
| pluginNames := sets.New[string]() | ||
| for _, spec := range configuredPlugins { | ||
| if spec.Type == "" { | ||
| return fmt.Errorf("plugin '%s' is missing a type", spec.Name) | ||
| } | ||
| if pluginNames.Has(spec.Name) { | ||
| return fmt.Errorf("duplicate plugin name '%s'", spec.Name) | ||
| } | ||
| pluginNames.Insert(spec.Name) | ||
|
|
||
| factory, ok := plugins.Registry[spec.Type] | ||
| if !ok { | ||
| return fmt.Errorf("plugin type '%s' is not registered", spec.Type) | ||
| } | ||
|
|
||
| plugin, err := factory(spec.Name, spec.Parameters, handle) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create plugin '%s' (type: %s): %w", spec.Name, spec.Type, err) | ||
| } | ||
|
|
||
| handle.AddPlugin(spec.Name, plugin) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func buildSchedulerConfig( | ||
| configProfiles []configapi.SchedulingProfile, | ||
| handle plugins.Handle, | ||
| ) (*scheduling.SchedulerConfig, error) { | ||
|
|
||
| profiles := make(map[string]*framework.SchedulerProfile) | ||
|
|
||
| // Build profiles. | ||
| for _, cfgProfile := range configProfiles { | ||
| fwProfile := framework.NewSchedulerProfile() | ||
|
|
||
| for _, pluginRef := range cfgProfile.Plugins { | ||
| plugin := handle.Plugin(pluginRef.PluginRef) | ||
| if plugin == nil { // Should be caught by validation, but defensive check. | ||
| return nil, fmt.Errorf( | ||
| "plugin '%s' referenced in profile '%s' not found in handle", | ||
| pluginRef.PluginRef, cfgProfile.Name) | ||
| } | ||
| if err := profile.AddPlugins(referencedPlugin); err != nil { | ||
| return nil, fmt.Errorf("failed to load scheduler config - %w", err) | ||
|
|
||
| // Wrap Scorers with weights. | ||
| if scorer, ok := plugin.(framework.Scorer); ok { | ||
| weight := DefaultScorerWeight | ||
| if pluginRef.Weight != nil { | ||
| weight = *pluginRef.Weight | ||
| } | ||
| plugin = framework.NewWeightedScorer(scorer, weight) | ||
| } | ||
|
|
||
| if err := fwProfile.AddPlugins(plugin); err != nil { | ||
| return nil, fmt.Errorf("failed to add plugin '%s' to profile '%s': %w", pluginRef.PluginRef, cfgProfile.Name, err) | ||
| } | ||
| } | ||
| profiles[namedProfile.Name] = profile | ||
| profiles[cfgProfile.Name] = fwProfile | ||
| } | ||
|
|
||
| // Find Profile Handler (singleton check). | ||
| var profileHandler framework.ProfileHandler | ||
| for pluginName, plugin := range handle.GetAllPluginsWithNames() { | ||
| if theProfileHandler, ok := plugin.(framework.ProfileHandler); ok { | ||
| for name, plugin := range handle.GetAllPluginsWithNames() { | ||
| if ph, ok := plugin.(framework.ProfileHandler); ok { | ||
| if profileHandler != nil { | ||
| return nil, fmt.Errorf("only one profile handler is allowed. Both %s and %s are profile handlers", profileHandler.TypedName().Name, pluginName) | ||
| return nil, fmt.Errorf("multiple profile handlers found ('%s', '%s'); only one is allowed", | ||
| profileHandler.TypedName().Name, name) | ||
| } | ||
| profileHandler = theProfileHandler | ||
| profileHandler = ph | ||
| } | ||
| } | ||
|
|
||
| if profileHandler == nil { | ||
| return nil, errors.New("no profile handler was specified") | ||
| return nil, errors.New("no profile handler configured") | ||
| } | ||
|
|
||
| // Validate SingleProfileHandler usage. | ||
| if profileHandler.TypedName().Type == profile.SingleProfileHandlerType && len(profiles) > 1 { | ||
| return nil, errors.New("single profile handler is intended to be used with a single profile, but multiple profiles were specified") | ||
| return nil, errors.New("SingleProfileHandler cannot support multiple scheduling profiles") | ||
| } | ||
|
|
||
| return scheduling.NewSchedulerConfig(profileHandler, profiles), nil | ||
| } | ||
|
|
||
| func loadFeatureConfig(featureGates configapi.FeatureGates) map[string]bool { | ||
| featureConfig := map[string]bool{} | ||
|
|
||
| func loadFeatureConfig(gates configapi.FeatureGates) map[string]bool { | ||
| // Initialize with all false. | ||
| config := make(map[string]bool, len(registeredFeatureGates)) | ||
| for gate := range registeredFeatureGates { | ||
| featureConfig[gate] = false | ||
| config[gate] = false | ||
| } | ||
|
|
||
| for _, gate := range featureGates { | ||
| featureConfig[gate] = true | ||
| // Apply overrides. | ||
| for _, gate := range gates { | ||
| config[gate] = true | ||
| } | ||
|
|
||
| return featureConfig | ||
| return config | ||
| } | ||
|
|
||
| func loadSaturationDetectorConfig(sd *configapi.SaturationDetector) *saturationdetector.Config { | ||
| sdConfig := saturationdetector.Config{} | ||
|
|
||
| sdConfig.QueueDepthThreshold = sd.QueueDepthThreshold | ||
| if sdConfig.QueueDepthThreshold <= 0 { | ||
| sdConfig.QueueDepthThreshold = saturationdetector.DefaultQueueDepthThreshold | ||
| } | ||
| sdConfig.KVCacheUtilThreshold = sd.KVCacheUtilThreshold | ||
| if sdConfig.KVCacheUtilThreshold <= 0.0 || sdConfig.KVCacheUtilThreshold >= 1.0 { | ||
| sdConfig.KVCacheUtilThreshold = saturationdetector.DefaultKVCacheUtilThreshold | ||
| func buildSaturationConfig(apiConfig *configapi.SaturationDetector) *saturationdetector.Config { | ||
| // 1. Initialize with Defaults | ||
| cfg := &saturationdetector.Config{ | ||
| QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold, | ||
| KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold, | ||
| MetricsStalenessThreshold: saturationdetector.DefaultMetricsStalenessThreshold, | ||
| } | ||
| sdConfig.MetricsStalenessThreshold = sd.MetricsStalenessThreshold.Duration | ||
| if sdConfig.MetricsStalenessThreshold <= 0.0 { | ||
| sdConfig.MetricsStalenessThreshold = saturationdetector.DefaultMetricsStalenessThreshold | ||
| } | ||
|
|
||
| return &sdConfig | ||
| } | ||
|
|
||
| func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins.Handle) error { | ||
| pluginNames := sets.New[string]() // set of plugin names, a name must be unique | ||
|
|
||
| for _, pluginConfig := range configuredPlugins { | ||
| if pluginConfig.Type == "" { | ||
| return fmt.Errorf("plugin definition for '%s' is missing a type", pluginConfig.Name) | ||
| // 2. Apply Overrides (if valid) | ||
| if apiConfig != nil { | ||
| if apiConfig.QueueDepthThreshold > 0 { | ||
| cfg.QueueDepthThreshold = apiConfig.QueueDepthThreshold | ||
| } | ||
|
|
||
| if pluginNames.Has(pluginConfig.Name) { | ||
| return fmt.Errorf("plugin name '%s' used more than once", pluginConfig.Name) | ||
| if apiConfig.KVCacheUtilThreshold > 0.0 && apiConfig.KVCacheUtilThreshold < 1.0 { | ||
| cfg.KVCacheUtilThreshold = apiConfig.KVCacheUtilThreshold | ||
| } | ||
| pluginNames.Insert(pluginConfig.Name) | ||
|
|
||
| factory, ok := plugins.Registry[pluginConfig.Type] | ||
| if !ok { | ||
| return fmt.Errorf("plugin type '%s' is not found in registry", pluginConfig.Type) | ||
| } | ||
|
|
||
| plugin, err := factory(pluginConfig.Name, pluginConfig.Parameters, handle) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to instantiate the plugin type '%s' - %w", pluginConfig.Type, err) | ||
| if apiConfig.MetricsStalenessThreshold.Duration > 0 { | ||
| cfg.MetricsStalenessThreshold = apiConfig.MetricsStalenessThreshold.Duration | ||
| } | ||
|
|
||
| handle.AddPlugin(pluginConfig.Name, plugin) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // RegisterFeatureGate registers feature gate keys for validation | ||
| func RegisterFeatureGate(gate string) { | ||
| registeredFeatureGates.Insert(gate) | ||
| return cfg | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is still run as phase one and phase two, see calling function names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, I am scoping this change to config/loader package. Will clean this up in runner.go in a separate pass. For now, I just want these names to be semantically clearer. I have a draft for those runner.go changes, I can share the relevant snippet if you would like.