Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 57 additions & 23 deletions corrai/fmu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,37 @@
import shutil
import tempfile
import warnings
from pathlib import Path
from contextlib import contextmanager
from pathlib import Path

import fmpy
from fmpy import simulate_fmu
import pandas as pd
from fmpy import simulate_fmu
from sklearn.pipeline import Pipeline

from corrai.base.model import Model

MODEL_VARIABLES_TYPES_MAP = {
"Real": float,
"Float32": float,
"Float64": float,
"Integer": int,
"Int8": int,
"UInt8": int,
"Int16": int,
"UInt16": int,
"Int32": int,
"UInt32": int,
"Int64": int,
"UInt64": int,
"Boolean": bool,
"String": str,
# Not yet implemented
# ('Enumeration'),
# ('Binary'),
# ('Clock'),
}

DEFAULT_SIMULATION_OPTIONS = {
"startTime": 0,
"stopTime": 24 * 3600,
Expand Down Expand Up @@ -297,6 +318,26 @@ def __init__(
UserWarning,
stacklevel=2,
)
self.property_dict = self._get_init_property_dict()

def _get_init_property_dict(self):
model_description = fmpy.read_model_description(self.fmu_path.as_posix())
fmu_param_dict = {
var.name: var
for var in model_description.modelVariables
if var.causality == "parameter"
}
prop_dict = {}
for name, var in fmu_param_dict.items():
try:
prop_dict[name] = MODEL_VARIABLES_TYPES_MAP[var.type](var.start)
except TypeError:
prop_dict[name] = var.start

except KeyError:
pass

return prop_dict

def get_property_values(
self, property_list: str | tuple[str, ...] | list[str]
Expand All @@ -323,22 +364,13 @@ def get_property_values(
>>> model.get_property_values(["x.k", "y.k"])
['2.0', '2.0']
"""
if isinstance(property_list, str):
property_list = (property_list,)
property_list = (
[property_list] if isinstance(property_list, str) else property_list
)
return [self.property_dict[prop] for prop in property_list]

model_description = fmpy.read_model_description(self.fmu_path.as_posix())
variable_map = {var.name: var for var in model_description.modelVariables}
values = []
for prop in property_list:
if prop in variable_map:
try:
val = float(variable_map[prop].start)
except (ValueError, TypeError):
val = variable_map[prop].start
values.append(val)
else:
values.append(None)
return values
def set_property_values(self, property_dict: dict):
self.property_dict.update(property_dict.items())

def simulate(
self,
Expand Down Expand Up @@ -440,10 +472,12 @@ def simulate(
2.0 3.0

"""
property_dict = dict(property_dict or {})

simu_property = self.property_dict.copy()
simu_property.update(dict(property_dict or {}))

if property_dict and debug_param:
print(property_dict)
print(simu_property)

simulation_options = {
**DEFAULT_SIMULATION_OPTIONS,
Expand All @@ -463,8 +497,8 @@ def simulate(
)

boundary_df = None
if property_dict:
boundary_df = property_dict.pop("boundary", boundary_df)
if simu_property:
boundary_df = simu_property.pop("boundary", boundary_df)

if simulation_options:
sim_boundary = simulation_options.pop("boundary", boundary_df)
Expand Down Expand Up @@ -500,7 +534,7 @@ def simulate(
local_boundary,
):
if local_boundary is not None and self.boundary_table_name:
property_dict[f"{self.boundary_table_name}.fileName"] = (
simu_property[f"{self.boundary_table_name}.fileName"] = (
local_boundary.as_posix()
)

Expand All @@ -510,7 +544,7 @@ def simulate(
stop_time=stop_sec,
step_size=step_sec,
relative_tolerance=simulation_options["tolerance"],
start_values=property_dict,
start_values=simu_property,
output=self.output_list,
solver=simulation_options["solver"],
output_interval=output_int_sec,
Expand Down
6 changes: 3 additions & 3 deletions tests/test_fmu.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,6 @@ def test_get_property_values(self):
fmu_path=PACKAGE_DIR / "rosen.fmu",
output_list=["res.showNumber"],
)
values = simu.get_property_values(("res.showNumber",))
assert isinstance(values, list)
assert len(values) == 1

vals = simu.get_property_values("x.k")
assert vals == [2.0]
Expand All @@ -154,6 +151,9 @@ def test_get_property_values(self):
vals = simu.get_property_values(["x.k", "y.k"])
assert vals == [2.0, 2.0]

simu.set_property_values({"x.k": 3.0})
assert simu.get_property_values("x.k") == [3.0]

def test_simulate_parallel(self):
simu = ModelicaFmuModel(
fmu_path=PACKAGE_DIR / "rosen.fmu",
Expand Down