Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 262 additions & 5 deletions corrai/sampling.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from functools import wraps
from typing import Union, Callable
Expand All @@ -7,6 +6,12 @@
import pandas as pd
import plotly.graph_objects as go
import datetime as dt
from sklearn.metrics import (
r2_score,
mean_absolute_error,
root_mean_squared_error,
max_error,
)

import plotly.figure_factory as ff

Expand All @@ -16,11 +21,21 @@
from SALib.sample import fast_sampler, latin

from corrai.base.utils import check_indicators_configs
from corrai.base.metrics import nmbe, cv_rmse
from corrai.base.parameter import Parameter
from corrai.base.model import Model
from corrai.base.math import aggregate_time_series
from corrai.base.simulate import run_simulations

SCORE_MAP = {
"r2": r2_score,
"nmbe": nmbe,
"cv_rmse": cv_rmse,
"mae": mean_absolute_error,
"rmse": root_mean_squared_error,
"max": max_error,
}


def plot_pcp(
parameter_values: pd.DataFrame,
Expand Down Expand Up @@ -405,6 +420,156 @@ def get_static_results_as_df(self):
raise ValueError("Cannot map results to a DataFrame with a dynamic Sample")
return pd.DataFrame(self.results.to_list(), index=self.results.index)

def get_score_df(
self,
indicator: str,
reference_time_series: pd.Series,
scoring_methods: list[str | Callable] = None,
resample_rule: str | pd.Timedelta | dt.timedelta = None,
agg_method: str = "mean",
) -> pd.DataFrame:
"""
Compute scoring metrics for a given indicator across all sample results.

This method evaluates the performance of dynamic model predictions by comparing
them against a reference time series. It supports multiple scoring metrics
(R², NMBE, CV(RMSE), MAE, RMSE, max error) and optional resampling of data.

Parameters
----------
indicator : str
Name of the indicator/variable to evaluate from the simulation results.
Must be a valid columns in the sample results DataFrame.
reference_time_series : pd.Series
Ground truth or measured time series data to compare against.
scoring_methods : list of str or callable, optional
List of scoring methods to apply. Can be:

- String values from ``SCORE_MAP``: ``"r2"``, ``"nmbe"``, ``"cv_rmse"``,
``"mae"``, ``"rmse"``, ``"max"``
- Custom callable functions with signature ``func(y_true, y_pred) -> float``

If None, all methods are used.
Default is None.
resample_rule : str, pd.Timedelta or dt.timedelta, optional
Resampling frequency for aggregating the time series data before scoring.
Examples: ``"D"`` (daily), ``"h"`` (hourly), ``"ME"`` (month end).
If None, no resampling is performed.
Default is None.
agg_method : str, optional
Aggregation method to use when resampling. Common values include:
``"mean"``, ``"sum"``, ``"min"``, ``"max"``, ``"median"``.
Default is ``"mean"``.

Returns
-------
pd.DataFrame
DataFrame containing scoring metrics for each sample.

- Index: sample identifiers from ``self.results``
- Columns: metric names (e.g., ``"r2_score"``, ``"nmbe"``, ``"cv_rmse"``)
- Values: computed metric values (float)

The DataFrame's index name is set to the resampling rule or the inferred
frequency of the reference time series.

Raises
------
NotImplementedError
If the model is not dynamic (``self.is_dynamic == False``).

Notes
-----
The scoring metrics available in ``SCORE_MAP`` are:

- ``r2``: R² score (coefficient of determination)
- ``nmbe``: Normalized Mean Bias Error
- ``cv_rmse``: Coefficient of Variation of Root Mean Squared Error
- ``mae``: Mean Absolute Error
- ``rmse``: Root Mean Squared Error
- ``max``: Maximum absolute error

When resampling is applied, both the predicted and reference time series are
resampled using the same rule and aggregation method to ensure alignment.

Examples
--------
Basic usage with default metrics:

>>> import pandas as pd
>>> import numpy as np
>>> # Assuming 'sample' is an instance of Sample class with results
>>> reference = pd.Series(
... np.random.randn(100),
... index=pd.date_range("2023-01-01", periods=100, freq="h"),
... )
>>> scores = sample.get_score_df(
... indicator="temperature", reference_time_series=reference
... )
>>> print(scores)
r2_score nmbe cv_rmse mae rmse max
0 0.85234 0.012345 0.234567 1.234567 1.567890 3.456789
1 0.82156 0.023456 0.345678 1.345678 1.678901 3.567890
...

Using specific metrics and daily resampling:

>>> scores = sample.get_score_df(
... indicator="Energy",
... reference_time_series=reference,
... scoring_methods=["r2", "rmse", "mae"],
... resample_rule="D",
... agg_method="sum",
... )
>>> print(scores)
r2_score rmse mae
D
0 0.91234 12.34567 10.12345
1 0.89123 13.45678 11.23456
...

See Also
--------
sklearn.metrics.r2_score : R² metric implementation
sklearn.metrics.mean_absolute_error : MAE metric implementation
sklearn.metrics.root_mean_squared_error : RMSE metric implementation
"""

if not self.is_dynamic:
raise NotImplementedError(
"get_score_df is not implemented for non dynamic models"
)

scores = pd.DataFrame()
scoring_methods = (
list(SCORE_MAP.keys()) if scoring_methods is None else scoring_methods
)

method_func = [
SCORE_MAP[method] if isinstance(method, str) else method
for method in scoring_methods
]

for idx, sample_res in self.results.items():
data = sample_res[indicator]
if resample_rule:
data = data.resample(resample_rule).agg(agg_method)
reference_time_series = reference_time_series.resample(
resample_rule
).agg(agg_method)

for method in method_func:
scores.loc[idx, method.__name__] = method(reference_time_series, data)

scores.index.name = (
resample_rule
if resample_rule
else reference_time_series.index.freq
if reference_time_series.index.freq
else reference_time_series.index.inferred_freq
)
return scores

def plot_hist(
self,
indicator: str,
Expand Down Expand Up @@ -795,7 +960,7 @@ def plot_pcp(
)


class Sampler(ABC):
class Sampler:
"""
Abstract base class for parameter samplers.

Expand Down Expand Up @@ -841,7 +1006,6 @@ def values(self):
def results(self):
return self.sample.results

@abstractmethod
def add_sample(self, *args, **kwargs) -> np.ndarray:
"""
Generate new samples and optionally run simulations.
Expand All @@ -853,7 +1017,7 @@ def add_sample(self, *args, **kwargs) -> np.ndarray:
ndarray
The newly generated sample values.
"""
pass
raise NotImplementedError("add_sample is not implemented for this class")

def _post_draw_sample(
self,
Expand All @@ -880,6 +1044,99 @@ def _post_draw_sample(
slice(sample_starts, sample_ends), n_cpu, simulation_kwargs
)

def append_sample_from_param_dict(
self, param_dict: dict[str, int | float | str], simulation_kwargs=None
):
"""
Add a new sample from a parameter dictionary and simulate it.

This method appends a single sample to the existing sample set by providing
parameter values as a dictionary. The keys must correspond to the ``name``
property of the ``Parameter`` objects in the ``parameters``. After adding
the sample, it automatically runs a simulation for the newly added parameters.

Parameters
----------
param_dict : dict of {str: int, float, or str}
Dictionary mapping parameter names to their values. Keys must match the
``name`` attribute of ``Parameter`` objects in ``self.parameters``.
All parameters must be present in the dictionary.

simulation_kwargs : dict, optional
Additional keyword arguments to pass to the simulation method.
These can include simulation-specific options such as solver settings,
output options, or other model-specific parameters.
Default is None.

Returns
-------
None
The method modifies the sample in place by adding a new row to
``self.sample`` and stores the simulation results in ``self.results``.

Raises
------
AssertionError
If any parameter name in ``param_dict`` is not found in
``self.values.columns``, indicating missing or misspelled parameter names.

Notes
-----
- The newly added sample is assigned the index ``self.values.index[-1]``,
which corresponds to the last index after appending.
- This method is useful for manual parameter exploration, adding specific
test cases, or iteratively building samples based on optimization results.

Examples
--------
Add a single sample with specific parameter values:

>>> from corrai.base.parameter import Parameter
>>> from corrai.base.model import IshigamiDynamic
>>> from corrai.sampling import LHSSampler
>>>
>>> # Define parameters
>>> params = [
... Parameter("par_x1", (-3.14, 3.14), model_property="x1"),
... Parameter("par_x2", (-3.14, 3.14), model_property="x2"),
... Parameter("par_x3", (-3.14, 3.14), model_property="x3"),
... ]
>>>
>>> # Create sample object
>>> simulation_opts = {
... "start": "2023-01-01 00:00:00",
... "end": "2023-01-01 23:00:00",
... "timestep": "h",
... }
>>> sample = LHSSampler(
... parameter_list=params,
... model=IshigamiDynamic(),
... simulation_options=simulation_opts,
... )
>>>
>>> # Add a specific sample
>>> new_params = {"par_x1": 1.5, "par_x2": -0.5, "par_x3": 2.0}
>>> sample.append_sample_from_param_dict(new_params)
>>> print(sample.values.tail(1))
par_x1 par_x2 par_x3
0 1.5 -0.5 2.0

See Also
--------
add_samples : Add multiple samples at once using a numpy array
simulate_at : Simulate a specific sample by its index
Parameter : Class defining model parameters with bounds and properties
"""

assert all(
val in self.values.columns for val in param_dict.keys()
), "Missing parameters in columns"

self.sample.add_samples(
np.array([[param_dict[val] for val in self.values.columns]])
)
self.simulate_at(idx=self.values.index[-1], simulation_kwargs=simulation_kwargs)

def simulate_at(
self,
idx: int | list[int] | np.ndarray | slice = None,
Expand Down Expand Up @@ -961,7 +1218,7 @@ def plot_pcp(
)


class RealSampler(Sampler, ABC):
class RealSampler(Sampler):
"""
Abstract base class for samplers that only support real-valued parameters.

Expand Down
2 changes: 1 addition & 1 deletion requirements/install-min.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ scipy==1.15.0
matplotlib==3.5.1
plotly==5.10.0
pymoo==0.6.0.1
scikit-learn==1.2.2
scikit-learn>=1.3.0
fmpy==0.3.6
SALib==1.5.1
fastprogress==1.0.3
Expand Down
43 changes: 43 additions & 0 deletions tests/test_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,32 @@ def test_sample_methods(self):
fig = sample.plot_sample("res")
assert fig

ref = pd.Series(
[123, 456],
index=pd.date_range("2009-01-02", periods=2, freq="d"),
name="ref",
)

sample.results[0].loc[pd.Timestamp("2009-01-02"), "res"] = 456
sample.results[1].loc[pd.Timestamp("2009-01-02"), "res"] = 43

score_res = sample.get_score_df("res", ref)

pd.testing.assert_frame_equal(
score_res,
pd.DataFrame(
{
"r2_score": {0: 1.0, 1: -2.19},
"nmbe": {0: -57.51, 1: 94.11},
"cv_rmse": {0: 115.02, 1: 188.23},
"mean_absolute_error": {0: 0.0, 1: 247.0},
"root_mean_squared_error": {0: 0.0, 1: 297.59},
"max_error": {0: 0.0, 1: 413.0},
}
).rename_axis(ref.index.freq),
atol=0.1,
)

sample._validate()

# Static Sample
Expand Down Expand Up @@ -423,6 +449,23 @@ def test_lhs_sampler(self):
check_exact=False,
)

sampler.append_sample_from_param_dict(
{
"param_1": 0,
"param_2": 0,
"param_3": 0,
}
)

assert sampler.values.iloc[-1, :].to_dict() == {
"param_1": 0.0,
"param_2": 0.0,
"param_3": 0.0,
}
assert sampler.results.iloc[-1].to_dict() == {
"res": {pd.Timestamp("2009-01-01 00:00:00"): 0.0}
}

# Static
sampler = LHSSampler(
parameters=REAL_PARAM,
Expand Down