diff --git a/pyRadPlan/optimization/new_templates/objectives.py b/pyRadPlan/optimization/new_templates/objectives.py new file mode 100644 index 0000000..e438b58 --- /dev/null +++ b/pyRadPlan/optimization/new_templates/objectives.py @@ -0,0 +1,21 @@ +import numpy as np + + +class abstract_objective: + def __init__(self): + pass + + def evaluate(x: np.ndarray[float]) -> float: + pass + + def evaluate_gradient(x: np.ndarray[float]) -> np.ndarray[float]: + pass + + def evaluate_hessian(x: np.ndarray[float]) -> np.ndarray[float]: + pass + + def is_linear() -> bool: + pass + + def is_convex() -> bool: + pass diff --git a/pyRadPlan/optimization/new_templates/planning_problem_sketch.py b/pyRadPlan/optimization/new_templates/planning_problem_sketch.py new file mode 100644 index 0000000..71796c3 --- /dev/null +++ b/pyRadPlan/optimization/new_templates/planning_problem_sketch.py @@ -0,0 +1,53 @@ +import numpy as np +from enum import Enum + + +class PlanningProblem: + def __init__( + self, + scalarization_method: str | Enum, + tradeoff_exploration_method: str | Enum | None, + method_parameters: dict, + ): + pass + + def evaluate_objectives(x: np.ndarray[float]) -> list[np.ndarray[float]]: + pass + + def evaluate_objective_gradients(x: np.ndarray[float]) -> list[np.ndarray[float]]: + # returns list of 2d arrays + pass + + def evaluate_objective_hessian(x: np.ndarray[float]) -> list[np.ndarray[float]]: + # returns list of 3d arrays + pass + + def evaluate_constraints(x: np.ndarray[float]) -> list[np.ndarray[float]]: + pass + + def evaluate_constraints_gradients(x: np.ndarray[float]) -> list[np.ndarray[float]]: + # returns list of 2d arrays + pass + + def evaluate_constraints_hessian(x: np.ndarray[float]) -> list[np.ndarray[float]]: + # returns list of 3d arrays + pass + + def are_objectives_convex() -> bool: + pass + + def are_objectives_linear() -> bool: + pass + + def solve(y: np.ndarray[float]) -> list[np.ndarray[float]]: + pass + + def make_tradeoff_exploration_method_instance( + evaluate_objectives: callable[np.ndarray[float], list[np.ndarray[float]]], + evaluate_constraints: callable[np.ndarray[float], list[np.ndarray[float]]], + evaluate_x_gradients, + etc, + scalarization_method: str, + method_parameters: dict, + ) -> TradeoffExplorationMethod: + pass diff --git a/pyRadPlan/optimization/new_templates/scalarization_method.py b/pyRadPlan/optimization/new_templates/scalarization_method.py new file mode 100644 index 0000000..f5a8774 --- /dev/null +++ b/pyRadPlan/optimization/new_templates/scalarization_method.py @@ -0,0 +1,44 @@ +import numpy as np + + +class ScalarizationMethod: + def __init__( + self, + evaluate_objectives: callable[np.ndarray[float], list[np.ndarray[float]]], + evaluate_constraints: callable[np.ndarray[float], list[np.ndarray[float]]], + evaluate_x_gradients, + etc, + parameters: dict, + ): + # Implementations of class should also manage the concatenating the additional variables and separating them + pass + + def variable_upper_bounds() -> np.ndarray[float]: + pass + + def variable_lower_bounds() -> np.ndarray[float]: + pass + + def get_linear_constrains(self) -> dict[Index, LinearConstraint]: + pass + + def get_nonlinear_constraints(self) -> dict[Index, NonlinearConstraint]: + pass + + def evaluate_objective(x: np.ndarray) -> float: + print("This is not the same as evaluating objectives. E.g. make weighted sum") + + def evaluate_constraints(x: np.ndarray) -> np.ndarray: + print( + "Most of the time this will be the objective constraints and the constraints from the scalarization method" + ) + + def solve(self, x: np.ndarray[float]) -> np.ndarray[float]: + print("Do something") + self._call_solver_interface(self.solver, self.solver_params) + + def is_objective_convex() -> bool: + pass + + def _call_solver_interface(solver: str, params: dict) -> np.ndarray[float]: + pass diff --git a/pyRadPlan/optimization/new_templates/tradeoff_exploration_method.py b/pyRadPlan/optimization/new_templates/tradeoff_exploration_method.py new file mode 100644 index 0000000..e943b02 --- /dev/null +++ b/pyRadPlan/optimization/new_templates/tradeoff_exploration_method.py @@ -0,0 +1,9 @@ +import numpy as np + + +class TradeoffExplorationMethod: + def __init__(self, params: dict): + pass + + def solve(x: np.ndarray[float]) -> list[np.ndarray[float]]: + pass diff --git a/pyRadPlan/optimization/objectives/_max_dvh.py b/pyRadPlan/optimization/objectives/_max_dvh.py index 580fcfe..efe858b 100644 --- a/pyRadPlan/optimization/objectives/_max_dvh.py +++ b/pyRadPlan/optimization/objectives/_max_dvh.py @@ -22,7 +22,8 @@ class MaxDVH(Objective): """ name = "Max DVH" - + is_convex = False + is_linear = True d: Annotated[float, Field(default=30.0, ge=0.0), ParameterMetadata(kind="reference")] v_max: Annotated[ float, Field(default=50.0, ge=0.0, le=100.0), ParameterMetadata(kind="relative_volume") diff --git a/pyRadPlan/optimization/objectives/_min_dvh.py b/pyRadPlan/optimization/objectives/_min_dvh.py index f1ab0f1..aab72b1 100644 --- a/pyRadPlan/optimization/objectives/_min_dvh.py +++ b/pyRadPlan/optimization/objectives/_min_dvh.py @@ -22,7 +22,8 @@ class MinDVH(Objective): """ name = "Min DVH" - + is_convex = False + is_linear = True d: Annotated[float, Field(default=30.0, ge=0.0), ParameterMetadata(kind="reference")] v_min: Annotated[ float, Field(default=95.0, ge=0.0, le=100.0), ParameterMetadata(kind="relative_volume") diff --git a/pyRadPlan/optimization/objectives/_objective.py b/pyRadPlan/optimization/objectives/_objective.py index d1f8ab5..b6cbdae 100644 --- a/pyRadPlan/optimization/objectives/_objective.py +++ b/pyRadPlan/optimization/objectives/_objective.py @@ -72,10 +72,18 @@ class Objective(PyRadPlanBaseModel): Weight/Priority assigned to the objective function. quantity : str The quantity this objective is connected to (e.g. 'physical_dose', 'RBExDose'). + is_linear : bool + bool to indicate if the objective is linear. + is_convex : bool + bool to indicate if the objective is convex. """ name: ClassVar[str] has_hessian: ClassVar[bool] = False + + is_linear: ClassVar[bool] = False + is_convex: ClassVar[bool] = True + priority: float = Field(default=1.0, ge=0.0, alias="penalty") quantity: str = Field(default="physical_dose") diff --git a/pyRadPlan/optimization/problems/_nonlin_fluence.py b/pyRadPlan/optimization/problems/_nonlin_fluence.py index 36e44ff..247c974 100644 --- a/pyRadPlan/optimization/problems/_nonlin_fluence.py +++ b/pyRadPlan/optimization/problems/_nonlin_fluence.py @@ -8,7 +8,6 @@ from ...plan import Plan from ._optiprob import NonLinearPlanningProblem -from ..solvers import NonLinearOptimizer from ..objectives import Objective logger = logging.getLogger(__name__) @@ -37,15 +36,13 @@ class NonLinearFluencePlanningProblem(NonLinearPlanningProblem): bypass_objective_jacobian: bool def __init__(self, pln: Union[Plan, dict] = None): - self.bypass_objective_jacobian = True + self.bypass_objective_jacobian = False self._target_voxels = None self._patient_voxels = None self._grad_cache_intermediate = None self._grad_cache = None - self._obj_times = [] - self._deriv_times = [] self._solve_time = None super().__init__(pln) @@ -54,17 +51,17 @@ def _initialize(self): """Initialize this problem.""" super()._initialize() - # Check if the solver is adequate to solve this problem - # TODO: check that it can do constraints - if not isinstance(self.solver, NonLinearOptimizer): - raise ValueError("Solver must be an instance of SolverBase") - - self.solver.objective = self._objective_function - self.solver.gradient = self._objective_gradient - self.solver.bounds = (0.0, np.inf) - self.solver.max_iter = 500 - - def _objective_functions(self, x: NDArray) -> NDArray: + # # Check if the solver is adequate to solve this problem + # # TODO: check that it can do constraints + # # if not isinstance(self.solver, NonLinearOptimizer): + # # raise ValueError("Solver must be an instance of SolverBase") + # self.solver = get_solver(self.solver) + # self.solver.objective = self._objective_function + # self.solver.gradient = self._objective_gradient + # self.solver.bounds = (0.0, np.inf) + # self.solver.max_iter = 500 + + def _evaluate_objective_functions(self, x: NDArray) -> NDArray: """Define the objective functions.""" q_vectors = {} @@ -85,8 +82,7 @@ def _objective_functions(self, x: NDArray) -> NDArray: f_vals.append( sum( [ - obj.priority - * obj.compute_objective(q_vectors[obj.quantity].flat[scen_ix][ix]) + obj.compute_objective(q_vectors[obj.quantity].flat[scen_ix][ix]) for scen_ix in q_scenarios[obj.quantity] ] ) @@ -95,13 +91,11 @@ def _objective_functions(self, x: NDArray) -> NDArray: # return as numpy array return np.asarray(f_vals, dtype=np.float64) - def _objective_function(self, x: NDArray) -> np.float64: - t = time.time() - f = np.sum(self._objective_functions(x)) - self._obj_times.append(time.time() - t) - return f + # def _objective_function(self, x: NDArray) -> np.float64: + # f = np.sum(self._evaluate_objective_functions(x)) + # return f - def _objective_jacobian(self, x: NDArray) -> NDArray: + def _evaluate_objective_jacobian(self, x: NDArray) -> NDArray: """Define the objective jacobian.""" q_vectors = {} @@ -142,8 +136,7 @@ def _objective_jacobian(self, x: NDArray) -> NDArray: q_cache_index = self._q_cache_index[cnt] for scen_ix in q_scenarios[obj.quantity]: self._grad_cache_intermediate[obj.quantity][q_cache_index, ix] += ( - obj.priority - * obj.compute_gradient(q_vectors[obj.quantity].flat[scen_ix][ix]) + obj.compute_gradient(q_vectors[obj.quantity].flat[scen_ix][ix]) ) cnt += 1 @@ -175,55 +168,35 @@ def _objective_jacobian(self, x: NDArray) -> NDArray: return self._grad_cache - def _objective_gradient(self, x: NDArray) -> NDArray: - t = time.time() - jac = np.sum(self._objective_jacobian(x), axis=0) - self._deriv_times.append(time.time() - t) - return jac - - def _objective_hessian(self, x: NDArray) -> NDArray: + def _evaluate_objective_hessian(self, x: NDArray) -> NDArray: """Define the objective hessian.""" return {} - def _constraint_functions(self, x: NDArray) -> NDArray: + def _evaluate_constraint_functions(self, x: NDArray) -> NDArray: """Define the constraint functions.""" return None - def _constraint_jacobian(self, x: NDArray) -> NDArray: + def _evaluate_constraint_jacobian(self, x: NDArray) -> NDArray: """Define the constraint jacobian.""" return None - def _constraint_jacobian_structure(self) -> NDArray: + def _evaluate_constraint_jacobian_structure(self) -> NDArray: """Define the constraint jacobian structure.""" return None - def _variable_bounds(self, x: NDArray) -> NDArray: + def _get_variable_bounds(self, x: NDArray) -> NDArray: """Define the variable bounds.""" return {} def _solve(self) -> tuple[NDArray, dict]: """Solve the problem.""" - self._deriv_times = [] - self._obj_times = [] - x0 = np.zeros((self._dij.total_num_of_bixels,), dtype=np.float64) t = time.time() - result = self.solver.solve(x0) + result = self.tradeoff_strategy.solve(x0) + # result = self.solver.solve(x0) self._solve_time = time.time() - t - logger.info( - "%d Objective function evaluations, avg. time: %g +/- %g s", - len(self._obj_times), - np.mean(self._obj_times), - np.std(self._obj_times), - ) - logger.info( - "%d Derivative evaluations, avg. time: %g +/- %g s", - len(self._deriv_times), - np.mean(self._deriv_times), - np.std(self._deriv_times), - ) logger.info("Solver time: %g s", self._solve_time) return result diff --git a/pyRadPlan/optimization/problems/_optiprob.py b/pyRadPlan/optimization/problems/_optiprob.py index d3ab4ae..bcbb942 100644 --- a/pyRadPlan/optimization/problems/_optiprob.py +++ b/pyRadPlan/optimization/problems/_optiprob.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod -from typing import Union, ClassVar +from typing import ClassVar, Union + import warnings import logging @@ -16,7 +17,15 @@ from pyRadPlan.quantities import FluenceDependentQuantity, get_quantity from ..objectives import get_objective -from ..solvers import get_available_solvers, get_solver, SolverBase +from ..solvers import get_available_solvers +from ..strategies.scalarization_strategies import ( + get_available_scalarization_strategies, +) # TODO: Name to change +from ..strategies.tradeoff_strategies import ( + get_available_tradeoff_strategies, + get_tradeoff_strategy, + TradeoffStrategyBase, +) # TODO: Name to change logger = logging.getLogger(__name__) @@ -49,7 +58,9 @@ class PlanningProblem(ABC): possible_radiation_modes: list[str] = ["photons", "protons", "helium", "carbon", "oxygen"] apply_overlap: bool - solver: Union[str, dict, SolverBase] + solver: Union[str, dict] + tradeoff_strategy: Union[str, dict, TradeoffStrategyBase] # TODO: Name to change + scalarization_strategy: Union[str, dict] # TODO: Name to change # Private properties _ct: CT @@ -67,7 +78,8 @@ class PlanningProblem(ABC): def __init__(self, pln: Union[Plan, dict] = None): self._scenario_model = None - + self.scalarization_strategy = "weighted_sum" # TODO: Name to change + self.tradeoff_strategy = "single" # TODO: Name to change self.solver = "ipopt" self.apply_overlap = True @@ -167,17 +179,19 @@ def _initialize(self): # sanitize objectives and constraints and manage required quantities objectives = [] quantity_ids = [] + priorities = [] for voi in self._cst.vois: if len(voi.objectives) > 0: # get the index list cube_ix = voi.indices_numpy objs = [get_objective(obj) for obj in voi.objectives] - objectives.append((cube_ix, objs)) quantity_ids.extend([obj.quantity for obj in objs]) + priorities.extend([obj.priority for obj in objs]) self._objective_list = objectives + self._priorities = np.array(priorities) # unique quantities quantity_ids = list(set(quantity_ids)) @@ -221,9 +235,7 @@ def _initialize(self): # self._quantities = quantity_obj_info # set solver options - self.solver = get_solver(self.solver) - - # initial point + # self.solver = get_solver(self.solver) def solve( self, @@ -263,33 +275,86 @@ def solve( return self._solve() -class NonLinearPlanningProblem(PlanningProblem): - """Abstract Class for all Treatment Planning Problems.""" +class InversePlanningProblem(PlanningProblem): + def __init__(self, pln: Union[Plan, dict] = None): + super().__init__(pln) + # TODO: set up the scalarization strategy and tradeoff strategy + # check tradeoff strategy + tradeoff_strategies = get_available_tradeoff_strategies() + if self.tradeoff_strategy not in tradeoff_strategies: + tradeoff_names = list(tradeoff_strategies.keys()) + warnings.warn( + f"Tradeoff strategy {self.tradeoff_strategy} not available. Choose from {tradeoff_strategies}" + ", and we will choose the first available one for you!" + ) + + self.tradeoff_strategy = tradeoff_names[0] + + # check scalarization strategy + scalarization_strategies = get_available_scalarization_strategies() + if self.scalarization_strategy not in scalarization_strategies: + scalarization_names = list(scalarization_strategies.keys()) + warnings.warn( + f"Scalarization strategy {self.scalarization_strategy} not available. Choose from {scalarization_strategies}" + ", and we will choose the first available one for you!" + ) + + self.scalarization_strategy = scalarization_names[0] + + # generate a dictionary with callbacks that can be passed to strategies + self.callbacks = { + "evaluate_objective_functions": self._evaluate_objective_functions, + "evaluate_objective_jacobian": self._evaluate_objective_jacobian, + "evaluate_objective_hessian": self._evaluate_objective_hessian, + "evaluate_constraint_functions": self._evaluate_constraint_functions, + "evaluate_constraint_jacobian": self._evaluate_constraint_jacobian, + "get_constraint_jacobian_structure": self._get_constraint_jacobian_structure, + "get_variable_bounds": self._get_variable_bounds, + } + + def _initialize(self): + super()._initialize() + + # set scalarization method (options?) + # scalarization_strategy = get_scalarization_strategy(self.scalarization_strategy,self.callbacks)#TODO: Pass options + + # set tradeoff strategy (options?) + self.tradeoff_strategy = get_tradeoff_strategy( + self.tradeoff_strategy, + self.callbacks, + self.scalarization_strategy, + self._priorities, + self.solver, + ) @abstractmethod - def _objective_functions(self, x: np.ndarray) -> np.ndarray: + def _evaluate_objective_functions(self, x: np.ndarray) -> np.ndarray: """Define the objective functions.""" @abstractmethod - def _objective_jacobian(self, x: np.ndarray) -> np.ndarray: + def _evaluate_objective_jacobian(self, x: np.ndarray) -> np.ndarray: """Define the objective jacobian.""" - def _objective_hessian(self, x: np.ndarray) -> np.ndarray: + def _evaluate_objective_hessian(self, x: np.ndarray) -> np.ndarray: """Define the objective hessian.""" return {} - def _constraint_functions(self, x: np.ndarray) -> np.ndarray: + def _evaluate_constraint_functions(self, x: np.ndarray) -> np.ndarray: """Define the constraint functions.""" return None - def _constraint_jacobian(self, x: np.ndarray) -> np.ndarray: + def _evaluate_constraint_jacobian(self, x: np.ndarray) -> np.ndarray: """Define the constraint jacobian.""" return None - def _constraint_jacobian_structure(self) -> np.ndarray: + def _get_constraint_jacobian_structure(self) -> np.ndarray: """Define the constraint jacobian structure.""" return None - def _variable_bounds(self, x: np.ndarray) -> np.ndarray: + def _get_variable_bounds(self, x: np.ndarray) -> np.ndarray: """Define the variable bounds.""" return np.array([0.0, np.inf], dtype=np.float64) + + +class NonLinearPlanningProblem(InversePlanningProblem): + """Abstract Class for all Treatment Planning Problems.""" diff --git a/pyRadPlan/optimization/problems/_simple_least_squares.py b/pyRadPlan/optimization/problems/_simple_least_squares.py index 2a9e51f..ed23815 100644 --- a/pyRadPlan/optimization/problems/_simple_least_squares.py +++ b/pyRadPlan/optimization/problems/_simple_least_squares.py @@ -45,7 +45,7 @@ def _initialize(self): "ftol": 1e-4, } - def _objective_functions(self, x: np.ndarray) -> np.ndarray: + def _evaluate_objective_functions(self, x: np.ndarray) -> np.ndarray: """Define the objective functions.""" if not np.array_equal(x, self._w_cache): @@ -61,9 +61,9 @@ def _objective_functions(self, x: np.ndarray) -> np.ndarray: return np.array([target_fun, patient_fun]) def _objective_function(self, x: np.ndarray) -> np.float64: - return np.dot(np.asarray(self.penalties), self._objective_functions(x)) + return np.dot(np.asarray(self.penalties), self._evaluate_objective_functions(x)) - def _objective_jacobian(self, x: np.ndarray) -> np.ndarray: + def _evaluate_objective_jacobian(self, x: np.ndarray) -> np.ndarray: """Define the objective jacobian.""" if not np.array_equal(x, self._w_cache): @@ -89,25 +89,25 @@ def _objective_jacobian(self, x: np.ndarray) -> np.ndarray: return self._grad_cache def _objective_gradient(self, x: np.ndarray) -> np.ndarray: - return np.sum(self._objective_jacobian(x) * self.penalties, axis=1) + return np.sum(self._evaluate_objective_jacobian(x) * self.penalties, axis=1) - def _objective_hessian(self, x: np.ndarray) -> np.ndarray: + def _evaluate_objective_hessian(self, x: np.ndarray) -> np.ndarray: """Define the objective hessian.""" return {} - def _constraint_functions(self, x: np.ndarray) -> np.ndarray: + def _evaluate_constraint_functions(self, x: np.ndarray) -> np.ndarray: """Define the constraint functions.""" return None - def _constraint_jacobian(self, x: np.ndarray) -> np.ndarray: + def _evaluate_constraint_jacobian(self, x: np.ndarray) -> np.ndarray: """Define the constraint jacobian.""" return None - def _constraint_jacobian_structure(self) -> np.ndarray: + def _evaluate_constraint_jacobian_structure(self) -> np.ndarray: """Define the constraint jacobian structure.""" return None - def _variable_bounds(self, x: np.ndarray) -> np.ndarray: + def _get_variable_bounds(self, x: np.ndarray) -> np.ndarray: """Define the variable bounds.""" return {} diff --git a/pyRadPlan/optimization/strategies/__init__.py b/pyRadPlan/optimization/strategies/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyRadPlan/optimization/strategies/scalarization_strategies/__init__.py b/pyRadPlan/optimization/strategies/scalarization_strategies/__init__.py new file mode 100644 index 0000000..87329f5 --- /dev/null +++ b/pyRadPlan/optimization/strategies/scalarization_strategies/__init__.py @@ -0,0 +1,22 @@ +"""Scalarization strategy module providing different scalarization strategies for pyRadPlan.""" + +from ._factory import ( + register_scalarization_strategy, + get_available_scalarization_strategies, + get_scalarization_strategy, +) + + +from ._base_scalarization_strategies import ScalarizationStrategyBase +from ._weighted_sum import WeightedSum + +register_scalarization_strategy(WeightedSum) + + +__all__ = [ + "ScalarizationStrategyBase", + "WeightedSum", + "register_scalarization_strategy", + "get_available_scalarization_strategies", + "get_scalarization_strategy", +] diff --git a/pyRadPlan/optimization/strategies/scalarization_strategies/_base_scalarization_strategies.py b/pyRadPlan/optimization/strategies/scalarization_strategies/_base_scalarization_strategies.py new file mode 100644 index 0000000..558167c --- /dev/null +++ b/pyRadPlan/optimization/strategies/scalarization_strategies/_base_scalarization_strategies.py @@ -0,0 +1,103 @@ +"""Scalarization Strategy Base Classes for Planning Problems.""" + +from typing import ClassVar, Callable, Union +from abc import ABC, abstractmethod +import numpy as np +from ...solvers import get_solver, SolverBase + + +class ScalarizationStrategyBase(ABC): + """ + Abstract Base Class for Scalarization Strategy Implementations / Interfaces. + + Parameters + ---------- + callbacks : dict[str, Callable] + Functions in the planning problem that are required for the actual optimization + + Attributes + ---------- + name : ClassVar[str] + Full name of the scalarization strategy + short_name : ClassVar[str] + Short name of the scalarization strategy + callbacks : dict[str, Callable] + Functions in the planning problem that are required for the actual optimization + solver : Union[str, dict, SolverBase] + Solver to be used for optimization + """ + + name: ClassVar[str] + short_name: ClassVar[str] + # scalarization_params: dict + callbacks: dict[str, Callable] + solver: Union[str, dict, SolverBase] + + # properties + # TODO + + def __init__( + self, + scalarization_model_params, # TODO: Define type + callbacks: dict[str, Callable], + solver: Union[str, dict], + ): + # Implementations of class should also manage the concatenating the additional variables and separating them + self.scalarization_model_params = scalarization_model_params + self.callbacks = callbacks + self.solver = solver + self._obj_times = [] + self._deriv_times = [] + + def __repr__(self) -> str: + return f"Scalarization Strategy {self.name} ({self.short_name})" + + def _initialize(self): + self.solver = get_solver(self.solver) + + @abstractmethod + def variable_upper_bounds() -> np.ndarray[float]: + pass + + @abstractmethod + def variable_lower_bounds() -> np.ndarray[float]: + pass + + @abstractmethod + def get_linear_constraints(self): # -> dict[Index, LinearConstraint]: + pass + + @abstractmethod + def get_nonlinear_constraints(self): # -> dict[Index, NonlinearConstraint]: + pass + + @abstractmethod + def evaluate_objective(x: np.ndarray) -> float: + print("This is not the same as evaluating objectives. E.g. make weighted sum") + + @abstractmethod + def evaluate_constraints(x: np.ndarray) -> np.ndarray: + print( + "Most of the time this will be the objective constraints and the constraints from the scalarization method" + ) + + # def solve(self, x: np.ndarray[float]) -> np.ndarray[float]: + # print("Do something") + # self._call_solver_interface(self.solver, self.solver_params) + + def is_objective_convex() -> bool: + pass + + def solve(self, x: np.ndarray[float]) -> np.ndarray[float]: + self._initialize() + self._obj_times = [] + self._deriv_times = [] + + return self._solve(x) + + @abstractmethod + def _solve(self, x: np.ndarray[float]) -> np.ndarray[float]: + pass + + def _call_solver_interface(solver: str, params: dict) -> np.ndarray[float]: + pass diff --git a/pyRadPlan/optimization/strategies/scalarization_strategies/_factory.py b/pyRadPlan/optimization/strategies/scalarization_strategies/_factory.py new file mode 100644 index 0000000..a7bc2e8 --- /dev/null +++ b/pyRadPlan/optimization/strategies/scalarization_strategies/_factory.py @@ -0,0 +1,81 @@ +"""Factory methods to manage available scalarization strategy implementations.""" + +import warnings +import logging +from typing import Union, Type +from ._base_scalarization_strategies import ScalarizationStrategyBase + +SCALARIZATIONSTRATEGIES = {} + +logger = logging.getLogger(__name__) + + +def register_scalarization_strategy(scalarization_cls: Type[ScalarizationStrategyBase]) -> None: + """ + Register a new scalarization strategy. + + Parameters + ---------- + scalarization_cls : type + A Scalarization Strategy class. + """ + if not issubclass(scalarization_cls, ScalarizationStrategyBase): + raise ValueError("Scalarization strategy must be a subclass of ScalarizationStrategyBase.") + + if scalarization_cls.short_name is None: + raise ValueError("Scalarization strategy must have a 'short_name' attribute.") + + if scalarization_cls.name is None: + raise ValueError("Scalarization strategy must have a 'name' attribute.") + + scalarization_name = scalarization_cls.short_name + if scalarization_name in SCALARIZATIONSTRATEGIES: + warnings.warn(f"Scalarization strategy '{scalarization_name}' is already registered.") + else: + SCALARIZATIONSTRATEGIES[scalarization_name] = scalarization_cls + + +def get_available_scalarization_strategies() -> dict[str]: + """ + Get a list of available scalarization strategies based on the plan. + + Returns + ------- + list + A list of available scalarization strategies. + """ + return SCALARIZATIONSTRATEGIES + + +def get_scalarization_strategy( + scalarization_desc: Union[str, dict], + scalarization_model_params, # TODO: Define type, + eval_callbacks: dict, + solver_desc: Union[str, dict], +) -> ScalarizationStrategyBase: + """ + Returns a scalarization strategy instance based on a descriptive parameter. + + Parameters + ---------- + scalarization_desc : Union[str, dict] + A string with the strategy name, or a dictionary with the strategy configuration + + Returns + ------- + ScalarizationStrategyBase + A strategy instance + """ + if isinstance(scalarization_desc, str): + strategy = SCALARIZATIONSTRATEGIES[scalarization_desc]( + scalarization_model_params, eval_callbacks, solver_desc + ) + + elif isinstance(scalarization_desc, dict): + raise NotImplementedError( + "Scalarization strategy configuration from dictionary not implemented yet." + ) + else: + raise ValueError(f"Invalid scalarization strategy description: {scalarization_desc}") + + return strategy diff --git a/pyRadPlan/optimization/strategies/scalarization_strategies/_lexicographic.py b/pyRadPlan/optimization/strategies/scalarization_strategies/_lexicographic.py new file mode 100644 index 0000000..e69de29 diff --git a/pyRadPlan/optimization/strategies/scalarization_strategies/_weighted_sum.py b/pyRadPlan/optimization/strategies/scalarization_strategies/_weighted_sum.py new file mode 100644 index 0000000..2468edc --- /dev/null +++ b/pyRadPlan/optimization/strategies/scalarization_strategies/_weighted_sum.py @@ -0,0 +1,87 @@ +from typing import Union +import numpy as np +import time +import logging + + +from ._base_scalarization_strategies import ScalarizationStrategyBase + +logger = logging.getLogger(__name__) + + +class WeightedSum(ScalarizationStrategyBase): + name = "Weighted sum scalarization strategy" + short_name = "weighted_sum" + weights: Union[np.ndarray[float], list[float], None] + + def __init__( + self, scalarization_model_params, callbacks: dict[str, callable], solver: Union[str, dict] + ): + # weights: Union[np.ndarray[float],list[float]] = None): + # functions in the planning problem that are required for the actual optimization + super().__init__(scalarization_model_params, callbacks, solver) + # self.weights = np.asarray(weights) + + def variable_lower_bounds(self): + return self.callbacks["get_variable_bounds"]()[0] + + def variable_upper_bounds(self): + return self.callbacks["get_variable_bounds"]()[1] + + def get_linear_constraints(self): + pass + + def get_nonlinear_constraints(self): + pass + + def evaluate_objective(x): + pass + + def evaluate_objective_jacobian(x): + pass + + def _evaluate_objective_function(self, x: np.ndarray) -> np.float64: + t = time.time() + f = self.scalarization_model_params @ self.callbacks["evaluate_objective_functions"](x) + self._obj_times.append(time.time() - t) + + # self.weights@self.callbacks["evaluate_objective_functions"](x) + return f + + def _evaluate_objective_gradient(self, x: np.ndarray) -> np.ndarray: + t = time.time() + + jac = np.sum( + self.callbacks["evaluate_objective_jacobian"](x) + * self.scalarization_model_params[:, None], + axis=0, + ) + self._deriv_times.append(time.time() - t) + return jac + + def evaluate_constraints(self, x): + return self.callbacks["evaluate_constraints"](x) + + def _solve(self, x: np.ndarray[float]) -> list[np.ndarray[float]]: + self.solver.objective = self._evaluate_objective_function + self.solver.gradient = self._evaluate_objective_gradient + self.solver.bounds = (0.0, np.inf) + self.solver.max_iter = 500 + + result = self.solver.solve(x) + + # needs to be moved + logger.info( + "%d Objective function evaluations, avg. time: %g +/- %g s", + len(self._obj_times), + np.mean(self._obj_times), + np.std(self._obj_times), + ) + logger.info( + "%d Derivative evaluations, avg. time: %g +/- %g s", + len(self._deriv_times), + np.mean(self._deriv_times), + np.std(self._deriv_times), + ) + + return result diff --git a/pyRadPlan/optimization/strategies/tradeoff_strategies/__init__.py b/pyRadPlan/optimization/strategies/tradeoff_strategies/__init__.py new file mode 100644 index 0000000..0042df8 --- /dev/null +++ b/pyRadPlan/optimization/strategies/tradeoff_strategies/__init__.py @@ -0,0 +1,20 @@ +"""Tradeoff strategy module providing different tradeoff strategies for pyRadPlan.""" + +from ._factory import ( + register_tradeoff_strategy, + get_available_tradeoff_strategies, + get_tradeoff_strategy, +) +from ._base_tradeoff_strategies import TradeoffStrategyBase +from ._single_plan import SinglePlan + +register_tradeoff_strategy(SinglePlan) + + +__all__ = [ + "TradeoffStrategyBase", + "SinglePlan", + "register_tradeoff_strategy", + "get_available_tradeoff_strategies", + "get_tradeoff_strategy", +] diff --git a/pyRadPlan/optimization/strategies/tradeoff_strategies/_base_tradeoff_strategies.py b/pyRadPlan/optimization/strategies/tradeoff_strategies/_base_tradeoff_strategies.py new file mode 100644 index 0000000..d46ff0b --- /dev/null +++ b/pyRadPlan/optimization/strategies/tradeoff_strategies/_base_tradeoff_strategies.py @@ -0,0 +1,52 @@ +from abc import ABC, abstractmethod +from typing import ClassVar, Union +import numpy as np +from ..scalarization_strategies import ScalarizationStrategyBase, get_scalarization_strategy + + +class TradeoffStrategyBase(ABC): + """ + To be written later + Abstract class for tradeoff exploration methods + + Parameters + ---------- + + Attributes + ---------- + """ + + short_name: ClassVar[str] + name: ClassVar[str] + scalarization_strategy: Union[str, dict, ScalarizationStrategyBase] + # scalarization_model_params, #TODO: Define type + callbacks: dict[str, callable] + solver: Union[str, dict] + + def __init__( + self, + callbacks: dict[str, callable], + scalarization_desc: Union[str, dict], + scalarization_model_params, # TODO: Define type, + solver_desc: Union[str, dict], + ): + self.callbacks = callbacks + self.scalarization_strategy = scalarization_desc + self.scalarization_model_params = scalarization_model_params + self.solver = solver_desc + + def solve(self, x: np.ndarray[float]) -> list[np.ndarray[float]]: + self._initialize() + return self._solve(x) + + def _initialize(self): + self.scalarization_strategy = get_scalarization_strategy( + self.scalarization_strategy, + self.scalarization_model_params, + self.callbacks, + self.solver, + ) # TODO: Pass options + + @abstractmethod + def _solve(self, x: np.ndarray[float]) -> list[np.ndarray[float]]: + pass diff --git a/pyRadPlan/optimization/strategies/tradeoff_strategies/_factory.py b/pyRadPlan/optimization/strategies/tradeoff_strategies/_factory.py new file mode 100644 index 0000000..0b48969 --- /dev/null +++ b/pyRadPlan/optimization/strategies/tradeoff_strategies/_factory.py @@ -0,0 +1,87 @@ +"""Factory method to manage available tradeoff exploration method implementations.""" + +import warnings +import logging +from typing import Union, Type +from ._base_tradeoff_strategies import TradeoffStrategyBase + +TRADEOFFSTRATEGIES = {} + +logger = logging.getLogger(__name__) + + +def register_tradeoff_strategy(tradeoff_cls: Type[TradeoffStrategyBase]) -> None: + """ + Register a new tradeoff strategy. + + Parameters + ---------- + tradeoff_cls : type + A Tradeoff Strategy class. + """ + if not issubclass(tradeoff_cls, TradeoffStrategyBase): + raise ValueError("Tradeoff strategy must be a subclass of TradeoffStrategyBase.") + + if tradeoff_cls.short_name is None: + raise ValueError("Tradeoff strategy must have a 'short_name' attribute.") + + if tradeoff_cls.name is None: + raise ValueError("Tradeoff strategy must have a 'name' attribute.") + + tradeoff_name = tradeoff_cls.short_name + if tradeoff_name in TRADEOFFSTRATEGIES: + warnings.warn(f"Tradeoff strategy '{tradeoff_name}' is already registered.") + else: + TRADEOFFSTRATEGIES[tradeoff_name] = tradeoff_cls + + +def get_available_tradeoff_strategies() -> dict[str, Type[TradeoffStrategyBase]]: + """ + Get a list of available tradeoff strategies based on the plan. + + Returns + ------- + list + A list of available tradeoff strategies. + """ + return TRADEOFFSTRATEGIES + + +def get_tradeoff_strategy( + tradeoff_desc: Union[str, dict], + callbacks: dict[str, callable], + scalarization_desc: Union[str, dict], + scalarization_model_params, # TODO: Define type, + solver_desc: Union[str, dict], +) -> TradeoffStrategyBase: + """ + Returns a tradeoff strategy based on a descriptive parameter. + + Parameters + ---------- + tradeoff_desc : Union[str, dict] + A string with the strategy name, or a dictionary with the strategy configuration + callbacks : dict[str, callable] + A dictionary with the functions in the planning problem that are required for the actual optimization + scalarization_desc : Union[str, dict] + A scalarization strategy instance + solver_desc : Union[str, dict] + A string with the solver name, or a dictionary with the solver configuration + + Returns + ------- + TradeoffStrategyBase + A solver instance + """ + if isinstance(tradeoff_desc, str): + tradeoff_strategy = TRADEOFFSTRATEGIES[tradeoff_desc]( + callbacks, scalarization_desc, scalarization_model_params, solver_desc + ) + elif isinstance(tradeoff_desc, dict): + raise NotImplementedError( + "Tradeoff strategy configuration from dictionary not implemented yet." + ) + else: + raise ValueError(f"Invalid tradeoff strategy description: {tradeoff_desc}") + + return tradeoff_strategy diff --git a/pyRadPlan/optimization/strategies/tradeoff_strategies/_single_plan.py b/pyRadPlan/optimization/strategies/tradeoff_strategies/_single_plan.py new file mode 100644 index 0000000..89b7f88 --- /dev/null +++ b/pyRadPlan/optimization/strategies/tradeoff_strategies/_single_plan.py @@ -0,0 +1,11 @@ +import numpy as np +from ._base_tradeoff_strategies import TradeoffStrategyBase + + +class SinglePlan(TradeoffStrategyBase): + name = "Single Plan Tradeoff Strategy" + short_name = "single" + scalarization_strategy = "WeightedSum" + + def _solve(self, x: np.ndarray[float]) -> list[np.ndarray[float]]: + return self.scalarization_strategy.solve(x)