Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (r *Runner) parseConfigurationPhaseOne(ctx context.Context) (*configapi.End

r.registerInTreePlugins()

rawConfig, featureGates, err := loader.LoadConfigPhaseOne(configBytes, logger)
rawConfig, featureGates, err := loader.LoadRawConfig(configBytes, logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is still run as phase one and phase two, see calling function names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I am scoping this change to config/loader package. Will clean this up in runner.go in a separate pass. For now, I just want these names to be semantically clearer. I have a draft for those runner.go changes, I can share the relevant snippet if you would like.

if err != nil {
return nil, fmt.Errorf("failed to parse config - %w", err)
}
Expand All @@ -494,7 +494,7 @@ func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {
func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *configapi.EndpointPickerConfig, ds datastore.Datastore) (*config.Config, error) {
logger := log.FromContext(ctx)
handle := plugins.NewEppHandle(ctx, makePodListFunc(ds))
cfg, err := loader.LoadConfigPhaseTwo(rawConfig, handle, logger)
cfg, err := loader.InstantiateAndConfigure(rawConfig, handle, logger)

if err != nil {
return nil, fmt.Errorf("failed to load the configuration - %w", err)
Expand Down
247 changes: 141 additions & 106 deletions pkg/epp/config/loader/configloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,171 +35,206 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
)

var scheme = runtime.NewScheme()

var registeredFeatureGates = sets.New[string]() // set of feature gates names, a name must be unique
var (
scheme = runtime.NewScheme()
registeredFeatureGates = sets.New[string]()
)

func init() {
utilruntime.Must(configapi.Install(scheme))
}

// LoadConfigPhaseOne first phase of loading configuration from supplied text that was converted to []byte
func LoadConfigPhaseOne(configBytes []byte, logger logr.Logger) (*configapi.EndpointPickerConfig, map[string]bool, error) {
rawConfig, err := loadRawConfig(configBytes)
// RegisterFeatureGate registers a feature gate name for validation purposes.
func RegisterFeatureGate(gate string) {
registeredFeatureGates.Insert(gate)
}

// LoadRawConfig parses the raw configuration bytes, applies initial defaults, and extracts feature gates.
// It does not instantiate plugins.
func LoadRawConfig(configBytes []byte, logger logr.Logger) (*configapi.EndpointPickerConfig, map[string]bool, error) {
// Decode JSON/YAML.
rawConfig, err := decodeRawConfig(configBytes)
if err != nil {
return nil, nil, err
}
logger.V(1).Info("Loaded raw configuration", "config", rawConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use logger utils consts instead of numbers?


logger.Info("Loaded configuration", "config", rawConfig)
// Sanitize data.
applyStaticDefaults(rawConfig)

if err = validateFeatureGates(rawConfig.FeatureGates); err != nil {
return nil, nil, fmt.Errorf("failed to validate feature gates - %w", err)
// Early validation of Feature Gates.
// We validate gates early because they might dictate downstream loading logic.
if err := validateFeatureGates(rawConfig.FeatureGates); err != nil {
return nil, nil, fmt.Errorf("feature gate validation failed: %w", err)
}

setDefaultsPhaseOne(rawConfig)

featureConfig := loadFeatureConfig(rawConfig.FeatureGates)

return rawConfig, featureConfig, nil
}

// LoadConfigPhaseOne first phase of loading configuration from supplied text that was converted to []byte
func LoadConfigPhaseTwo(rawConfig *configapi.EndpointPickerConfig, handle plugins.Handle, logger logr.Logger) (*config.Config, error) {
var err error
// instantiate loaded plugins
if err = instantiatePlugins(rawConfig.Plugins, handle); err != nil {
return nil, fmt.Errorf("failed to instantiate plugins - %w", err)
}

setDefaultsPhaseTwo(rawConfig, handle)
// InstantiateAndConfigure performs the heavy lifting of plugin instantiation, system architecture injection, and
// scheduler construction.
func InstantiateAndConfigure(
rawConfig *configapi.EndpointPickerConfig,
handle plugins.Handle,
logger logr.Logger,
) (*config.Config, error) {

logger.Info("Configuration with defaults set", "config", rawConfig)
// Instantiate user-configured plugins.
if err := instantiatePlugins(rawConfig.Plugins, handle); err != nil {
return nil, fmt.Errorf("plugin instantiation failed: %w", err)
}

if err = validateSchedulingProfiles(rawConfig); err != nil {
return nil, fmt.Errorf("failed to validate scheduling profiles - %w", err)
// Fill in the architectural gaps (inject required system plugins).
if err := applySystemDefaults(rawConfig, handle); err != nil {
return nil, fmt.Errorf("system default application failed: %w", err)
}
logger.Info("Effective configuration loaded", "config", rawConfig)

config := &config.Config{}
// Deep validation checks relationships between now-finalized profiles and plugins.
if err := validateConfig(rawConfig); err != nil {
return nil, fmt.Errorf("configuration validation failed: %w", err)
}

config.SchedulerConfig, err = loadSchedulerConfig(rawConfig.SchedulingProfiles, handle)
// Build scheduler config.
schedulerConfig, err := buildSchedulerConfig(rawConfig.SchedulingProfiles, handle)
if err != nil {
return nil, err
return nil, fmt.Errorf("scheduler config build failed: %w", err)
}
config.SaturationDetectorConfig = loadSaturationDetectorConfig(rawConfig.SaturationDetector)

return config, nil
return &config.Config{
SchedulerConfig: schedulerConfig,
SaturationDetectorConfig: buildSaturationConfig(rawConfig.SaturationDetector),
}, nil
}

func loadRawConfig(configBytes []byte) (*configapi.EndpointPickerConfig, error) {
rawConfig := &configapi.EndpointPickerConfig{}

func decodeRawConfig(configBytes []byte) (*configapi.EndpointPickerConfig, error) {
cfg := &configapi.EndpointPickerConfig{}
codecs := serializer.NewCodecFactory(scheme, serializer.EnableStrict)
err := runtime.DecodeInto(codecs.UniversalDecoder(), configBytes, rawConfig)
if err != nil {
return nil, fmt.Errorf("the configuration is invalid - %w", err)
if err := runtime.DecodeInto(codecs.UniversalDecoder(), configBytes, cfg); err != nil {
return nil, fmt.Errorf("failed to decode configuration JSON/YAML: %w", err)
}
return rawConfig, nil
return cfg, nil
}

func loadSchedulerConfig(configProfiles []configapi.SchedulingProfile, handle plugins.Handle) (*scheduling.SchedulerConfig, error) {
profiles := map[string]*framework.SchedulerProfile{}
for _, namedProfile := range configProfiles {
profile := framework.NewSchedulerProfile()
for _, plugin := range namedProfile.Plugins {
referencedPlugin := handle.Plugin(plugin.PluginRef)
if scorer, ok := referencedPlugin.(framework.Scorer); ok {
referencedPlugin = framework.NewWeightedScorer(scorer, *plugin.Weight)
func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins.Handle) error {
pluginNames := sets.New[string]()
for _, spec := range configuredPlugins {
if spec.Type == "" {
return fmt.Errorf("plugin '%s' is missing a type", spec.Name)
}
if pluginNames.Has(spec.Name) {
return fmt.Errorf("duplicate plugin name '%s'", spec.Name)
}
pluginNames.Insert(spec.Name)

factory, ok := plugins.Registry[spec.Type]
if !ok {
return fmt.Errorf("plugin type '%s' is not registered", spec.Type)
}

plugin, err := factory(spec.Name, spec.Parameters, handle)
if err != nil {
return fmt.Errorf("failed to create plugin '%s' (type: %s): %w", spec.Name, spec.Type, err)
}

handle.AddPlugin(spec.Name, plugin)
}
return nil
}

func buildSchedulerConfig(
configProfiles []configapi.SchedulingProfile,
handle plugins.Handle,
) (*scheduling.SchedulerConfig, error) {

profiles := make(map[string]*framework.SchedulerProfile)

// Build profiles.
for _, cfgProfile := range configProfiles {
fwProfile := framework.NewSchedulerProfile()

for _, pluginRef := range cfgProfile.Plugins {
plugin := handle.Plugin(pluginRef.PluginRef)
if plugin == nil { // Should be caught by validation, but defensive check.
return nil, fmt.Errorf(
"plugin '%s' referenced in profile '%s' not found in handle",
pluginRef.PluginRef, cfgProfile.Name)
}
if err := profile.AddPlugins(referencedPlugin); err != nil {
return nil, fmt.Errorf("failed to load scheduler config - %w", err)

// Wrap Scorers with weights.
if scorer, ok := plugin.(framework.Scorer); ok {
weight := DefaultScorerWeight
if pluginRef.Weight != nil {
weight = *pluginRef.Weight
}
plugin = framework.NewWeightedScorer(scorer, weight)
}

if err := fwProfile.AddPlugins(plugin); err != nil {
return nil, fmt.Errorf("failed to add plugin '%s' to profile '%s': %w", pluginRef.PluginRef, cfgProfile.Name, err)
}
}
profiles[namedProfile.Name] = profile
profiles[cfgProfile.Name] = fwProfile
}

// Find Profile Handler (singleton check).
var profileHandler framework.ProfileHandler
for pluginName, plugin := range handle.GetAllPluginsWithNames() {
if theProfileHandler, ok := plugin.(framework.ProfileHandler); ok {
for name, plugin := range handle.GetAllPluginsWithNames() {
if ph, ok := plugin.(framework.ProfileHandler); ok {
if profileHandler != nil {
return nil, fmt.Errorf("only one profile handler is allowed. Both %s and %s are profile handlers", profileHandler.TypedName().Name, pluginName)
return nil, fmt.Errorf("multiple profile handlers found ('%s', '%s'); only one is allowed",
profileHandler.TypedName().Name, name)
}
profileHandler = theProfileHandler
profileHandler = ph
}
}

if profileHandler == nil {
return nil, errors.New("no profile handler was specified")
return nil, errors.New("no profile handler configured")
}

// Validate SingleProfileHandler usage.
if profileHandler.TypedName().Type == profile.SingleProfileHandlerType && len(profiles) > 1 {
return nil, errors.New("single profile handler is intended to be used with a single profile, but multiple profiles were specified")
return nil, errors.New("SingleProfileHandler cannot support multiple scheduling profiles")
}

return scheduling.NewSchedulerConfig(profileHandler, profiles), nil
}

func loadFeatureConfig(featureGates configapi.FeatureGates) map[string]bool {
featureConfig := map[string]bool{}

func loadFeatureConfig(gates configapi.FeatureGates) map[string]bool {
// Initialize with all false.
config := make(map[string]bool, len(registeredFeatureGates))
for gate := range registeredFeatureGates {
featureConfig[gate] = false
config[gate] = false
}

for _, gate := range featureGates {
featureConfig[gate] = true
// Apply overrides.
for _, gate := range gates {
config[gate] = true
}

return featureConfig
return config
}

func loadSaturationDetectorConfig(sd *configapi.SaturationDetector) *saturationdetector.Config {
sdConfig := saturationdetector.Config{}

sdConfig.QueueDepthThreshold = sd.QueueDepthThreshold
if sdConfig.QueueDepthThreshold <= 0 {
sdConfig.QueueDepthThreshold = saturationdetector.DefaultQueueDepthThreshold
}
sdConfig.KVCacheUtilThreshold = sd.KVCacheUtilThreshold
if sdConfig.KVCacheUtilThreshold <= 0.0 || sdConfig.KVCacheUtilThreshold >= 1.0 {
sdConfig.KVCacheUtilThreshold = saturationdetector.DefaultKVCacheUtilThreshold
func buildSaturationConfig(apiConfig *configapi.SaturationDetector) *saturationdetector.Config {
// 1. Initialize with Defaults
cfg := &saturationdetector.Config{
QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold,
KVCacheUtilThreshold: saturationdetector.DefaultKVCacheUtilThreshold,
MetricsStalenessThreshold: saturationdetector.DefaultMetricsStalenessThreshold,
}
sdConfig.MetricsStalenessThreshold = sd.MetricsStalenessThreshold.Duration
if sdConfig.MetricsStalenessThreshold <= 0.0 {
sdConfig.MetricsStalenessThreshold = saturationdetector.DefaultMetricsStalenessThreshold
}

return &sdConfig
}

func instantiatePlugins(configuredPlugins []configapi.PluginSpec, handle plugins.Handle) error {
pluginNames := sets.New[string]() // set of plugin names, a name must be unique

for _, pluginConfig := range configuredPlugins {
if pluginConfig.Type == "" {
return fmt.Errorf("plugin definition for '%s' is missing a type", pluginConfig.Name)
// 2. Apply Overrides (if valid)
if apiConfig != nil {
if apiConfig.QueueDepthThreshold > 0 {
cfg.QueueDepthThreshold = apiConfig.QueueDepthThreshold
}

if pluginNames.Has(pluginConfig.Name) {
return fmt.Errorf("plugin name '%s' used more than once", pluginConfig.Name)
if apiConfig.KVCacheUtilThreshold > 0.0 && apiConfig.KVCacheUtilThreshold < 1.0 {
cfg.KVCacheUtilThreshold = apiConfig.KVCacheUtilThreshold
}
pluginNames.Insert(pluginConfig.Name)

factory, ok := plugins.Registry[pluginConfig.Type]
if !ok {
return fmt.Errorf("plugin type '%s' is not found in registry", pluginConfig.Type)
}

plugin, err := factory(pluginConfig.Name, pluginConfig.Parameters, handle)
if err != nil {
return fmt.Errorf("failed to instantiate the plugin type '%s' - %w", pluginConfig.Type, err)
if apiConfig.MetricsStalenessThreshold.Duration > 0 {
cfg.MetricsStalenessThreshold = apiConfig.MetricsStalenessThreshold.Duration
}

handle.AddPlugin(pluginConfig.Name, plugin)
}

return nil
}

// RegisterFeatureGate registers feature gate keys for validation
func RegisterFeatureGate(gate string) {
registeredFeatureGates.Insert(gate)
return cfg
}
Loading