Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions src/access/profiling/access_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,6 @@ def get_component_logs(self, path: Path) -> dict[str, ProfilingLog]:
class RAM3Profiling(CylcRoseManager):
"""Handles profiling of ACCESS-rAM3 configurations."""

def __init__(self, work_dir: Path, archive_dir: Path, layout_variable: str):
super().__init__(work_dir, archive_dir)
self.layout_variable = layout_variable

def parse_ncpus(self, path: Path) -> int:
# this is a symlink
config_path = path / "log/rose-suite-run.conf"

if not config_path.is_file():
raise FileNotFoundError(f"Could not find suitable config file in {config_path}")

for line in config_path.read_text().split():
if not line.startswith("!!"):
keypair = line.split("=")
if keypair[0].strip() == self.layout_variable:
layout = keypair[1].split(",")
return int(layout[0].strip()) * int(layout[1].strip())

raise ValueError(f"Cannot find layout key, {self.layout_variable}, in {config_path}.")

@property
def known_parsers(self):
return {
Expand Down
101 changes: 41 additions & 60 deletions src/access/profiling/cylc_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from abc import ABC, abstractmethod
from pathlib import Path

import xarray as xr

from access.profiling.cylc_parser import CylcDBReader, CylcProfilingParser
from access.profiling.experiment import ProfilingLog
from access.profiling.manager import ProfilingManager
Expand All @@ -16,19 +14,19 @@


class CylcRoseManager(ProfilingManager, ABC):
"""Abstract base class to handle profiling data for Cylc Rose configurations."""
"""Abstract base class to handle profiling data for Cylc Rose configurations.

@abstractmethod
def parse_ncpus(self, path: Path) -> int:
"""Parses the number of CPUs used in a given Cylc experiment.
Args:
work_dir (Path): Working directory where profiling experiments will be generated and run.
archive_dir (Path): Directory where completed experiments will be archived.
layout_variable (str): Name of the variable in rose-suite-run.conf file that defines the layout.
"""

Args:
path (Path): Path to the Payu experiment directory. Must contain a rose-suite.conf file.
_layout_variable: str # Name of the variable in rose-suite-run.conf file that defines the layout.

Returns:
int: Number of CPUs used in the experiment. If multiple submodels are defined, returns the sum of their
cpus.
"""
def __init__(self, work_dir: Path, archive_dir: Path, layout_variable: str):
super().__init__(work_dir, archive_dir)
self._layout_variable = layout_variable

@property
@abstractmethod
Expand All @@ -39,55 +37,30 @@ def known_parsers(self) -> dict[str, ProfilingParser]:
dict[str, ProfilingParser]: a dictionary of known parsers with names as keys.
"""

def find_component_datasets(self, path: Path) -> dict[str, ProfilingLog]:
"""Returns available profiling logs for the components in the configuration.

Args:
path (Path): Path to the output directory.

Returns:
dict[str, ProfilingLog]: Dictionary mapping component names to their ProfilingLog instances.
def parse_ncpus(self, path: Path) -> int:
# this is a symlink
config_path = path / "log/rose-suite-run.conf"

Raises:
RuntimeError: If no logs could be found.
"""
datasets = {}
# matches <cycle> / <task> / NN / job.out e.g. 20220226T0000Z/Lismore_d1100_GAL9_um_fcst_000/NN/job.out
# NN is the last attempt
# job.out is the stdout
# this pattern is followed for all cylc workflows.
# as tasks of interest will likely have their own logging regions e.g. UM each task_cycle is
# treated as a "component" of the configuration.
for logfile in path.glob("*/*/NN/job.out"):
cycle, task = logfile.parts[-4:-2]
for parser_name, parser in self.known_parsers.items():
# parsers raise an error if the log doesn't contain valid data.
# skip log if parsing fails.
try:
datasets[f"{task}_cycle{cycle}_{parser_name}"] = ProfilingLog(logfile, parser).parse()
continue
except ValueError: # all the parsers raise a ValueError if they can't find matching data
pass
if not config_path.is_file():
raise FileNotFoundError(f"Could not find suitable config file in {config_path}")

if datasets == {}:
raise RuntimeError(f"Could not find any known logs in {path}")
for line in config_path.read_text().split():
if not line.startswith("!!"):
keypair = line.split("=")
if keypair[0].strip() == self._layout_variable:
layout = keypair[1].split(",")
return int(layout[0].strip()) * int(layout[1].strip())

return datasets
raise ValueError(f"Cannot find layout key, {self._layout_variable}, in {config_path}.")

def parse_profiling_data(self, path: Path) -> dict[str, xr.Dataset]:
"""Parses profiling data from a Cylc Rose experiment directory.
def profiling_logs(self, path: Path) -> dict[str, ProfilingLog]:
"""Returns all profiling logs from the specified path.

Args:
path (Path): Path to the Cylc Rose experiment directory.

path (Path): Path to the experiment directory.
Returns:
dict[str, xr.Dataset]: Dictionary mapping component names to their profiling datasets.

Raises:
FileNotFoundError: If the suite log or cylc-suite.db files are missing.
RuntimeError: If the expected cylc task table is not found in the cylc-suite.db file.
dict[str, ProfilingLog]: Dictionary of profiling logs.
"""
datasets = {}
logs = {}

# setup log paths
Expand All @@ -99,12 +72,20 @@ def parse_profiling_data(self, path: Path) -> dict[str, xr.Dataset]:
# cylcdb.read_text = lambda x: x # hack to make log work
logs["cylc_tasks"] = ProfilingLog(cylcdb, CylcDBReader())

for name, log in logs.items():
logger.info(f"Parsing {name} profiling log: {log.filepath}.")
datasets[name] = log.parse()
logger.info(" Done.")
# Search for available profiling logs for the components in the configuration.
# matches <cycle> / <task> / NN / job.out e.g. 20220226T0000Z/Lismore_d1100_GAL9_um_fcst_000/NN/job.out
# NN is the last attempt
# job.out is the stdout
# this pattern is followed for all cylc workflows.
# as tasks of interest will likely have their own logging regions e.g. UM each task_cycle is
# treated as a "component" of the configuration.
possible_component_logs = list(jobdir.glob("*/*/NN/job.out"))
if not possible_component_logs:
raise RuntimeError(f"Could not find any known logs in {jobdir}")

# find known component datasets
datasets.update(self.find_component_datasets(jobdir))
for logfile in possible_component_logs:
cycle, task = logfile.parts[-4:-2]
for parser_name, parser in self.known_parsers.items():
logs[f"{task}_cycle{cycle}_{parser_name}"] = ProfilingLog(logfile, parser, optional=True)

return datasets
return logs
11 changes: 10 additions & 1 deletion src/access/profiling/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,23 @@ class ProfilingLog:
Args:
filepath (Path): Path to the log file.
parser (ProfilingParser): Parser to use for this log file.
optional (bool): Whether this log might be missing or does not contain parsable data. If True, no error should
be raised if the log is missing or unparsable. Defaults to False.
"""

filepath: Path # Path to the log file
parser: ProfilingParser # Parser to use for this log file
_optional: bool = False # Whether this log might not be present

def __init__(self, filepath: Path, parser: ProfilingParser):
def __init__(self, filepath: Path, parser: ProfilingParser, optional: bool = False) -> None:
self.filepath = filepath
self.parser = parser
self._optional = optional

@property
def optional(self) -> bool:
"""bool: Whether this log might not be present."""
return self._optional

def parse(self) -> xr.Dataset:
"""Parses the log file and returns the profiling data as an xarray Dataset.
Expand Down
92 changes: 70 additions & 22 deletions src/access/profiling/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import xarray as xr
from matplotlib.figure import Figure

from access.profiling.experiment import ProfilingExperiment, ProfilingExperimentStatus
from access.profiling.experiment import ProfilingExperiment, ProfilingExperimentStatus, ProfilingLog
from access.profiling.metrics import ProfilingMetric
from access.profiling.scaling import plot_scaling_metrics

Expand All @@ -31,7 +31,9 @@ class ProfilingManager(ABC):
work_dir: Path # Working directory where profiling experiments will be generated and run.
archive_dir: Path # Directory where completed experiments will be archived.
experiments: dict[str, ProfilingExperiment] # Dictionary storing ProfilingExperiment instances.
data: dict[str, xr.Dataset] # Dictionary mapping component names to their profiling datasets.
data: dict[
str, dict[str, xr.Dataset]
] # Dictionary mapping experiments to component names and their profiling datasets.

def __init__(self, work_dir: Path, archive_dir: Path):
super().__init__()
Expand Down Expand Up @@ -68,14 +70,14 @@ def __repr__(self) -> str:
return summary

@abstractmethod
def parse_profiling_data(self, path: Path) -> dict[str, xr.Dataset]:
"""Parses profiling data from the specified path.
def profiling_logs(self, path: Path) -> dict[str, ProfilingLog]:
"""Returns all profiling logs from the specified path.

Args:
path (Path): Path to the experiment directory.

Returns:
dict[str, xr.Dataset]: Dictionary mapping component names to their profiling datasets.
dict[str, ProfilingLog]: Dictionary of profiling logs.
"""

@abstractmethod
Expand Down Expand Up @@ -153,31 +155,40 @@ def delete_experiment(self, name: str) -> None:
else:
logger.warning(f"Experiment '{name}' not found; cannot delete.")

def parse_scaling_data(self):
def parse_profiling_data(self):
"""Parses profiling data from the experiments."""
self.data = {}
for exp in self.experiments.values():
for exp_name, exp in self.experiments.items():
if exp.status == ProfilingExperimentStatus.DONE or exp.status == ProfilingExperimentStatus.ARCHIVED:
logger.info(f"Parsing profiling data for experiment '{exp_name}'.")
self.data[exp_name] = {}
with exp.directory() as exp_path:
datasets = self.parse_profiling_data(exp_path)

# Find number of cpus used
ncpus = self.parse_ncpus(exp_path)

# Add ncpus dimension and concatenate with existing data
for name, ds in datasets.items():
ds = ds.expand_dims({"ncpus": 1}).assign_coords({"ncpus": [ncpus]})
if name in self.data:
self.data[name] = xr.concat([self.data[name], ds], dim="ncpus", join="outer").sortby("ncpus")
else:
self.data[name] = ds
# Parse all logs
logs = self.profiling_logs(exp_path)
for log_name, log in logs.items():
logger.info(f"Parsing {log_name} profiling log: {log.filepath}. ")
if log.optional:
try:
self.data[exp_name][log_name] = log.parse()
except FileNotFoundError:
logger.info(f"Optional profiling log '{log.filepath}' not found. Skipping.")
continue
else:
self.data[exp_name][log_name] = log.parse()
logger.info(" Done.")
else:
logger.warning(
f"Experiment '{exp_name}' is not completed (status: {exp.status.name}). Skipping parsing profiling "
"data."
)

def plot_scaling_data(
self,
components: list[str],
regions: list[list[str]],
metric: ProfilingMetric,
region_relabel_map: dict | None = None,
experiments: list[str] | None = None,
) -> Figure:
"""Plots scaling data for the specified components, regions and metric.

Expand All @@ -187,6 +198,43 @@ def plot_scaling_data(
metric (ProfilingMetric): Metric to use for the scaling plots.
region_relabel_map (dict | None): Optional mapping to relabel regions in the plots.
"""
return plot_scaling_metrics(
[self.data[c] for c in components], regions, metric, region_relabel_map=region_relabel_map
)

# Find number of cpus used for each experiment
ncpus = {}
for exp_name in self.data:
with self.experiments[exp_name].directory() as exp_path:
# Find number of cpus used
ncpus[exp_name] = self.parse_ncpus(exp_path)

# Gather scaling data for each component
scaling_data = []
for component, component_regions in zip(components, regions, strict=True):
component_data = None
for exp_name in self.data:
# Skip experiments not in the specified list
if experiments is not None and exp_name not in experiments:
continue

ds = self.data[exp_name].get(component)
if ds is None:
raise ValueError(f"No profiling data found for component '{component}' in experiment '{exp_name}'.")

# Select only the desired regions
ds = ds.sel(region=component_regions)

# Relabel regions if a relabel map is provided
if region_relabel_map is not None:
ds = ds.assign_coords(region=[region_relabel_map.get(n, n) for n in ds.region.values])

# Add ncpus dimension
ds = ds.expand_dims({"ncpus": 1}).assign_coords({"ncpus": [ncpus[exp_name]]})

# Concatenate data along ncpus dimension
if component_data is None:
component_data = ds
else:
component_data = xr.concat([component_data, ds], dim="ncpus", join="outer").sortby("ncpus")

scaling_data.append(component_data)

return plot_scaling_metrics(scaling_data, metric)
21 changes: 5 additions & 16 deletions src/access/profiling/payu_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from abc import ABC, abstractmethod
from pathlib import Path

import xarray as xr
from access.config import YAMLParser
from experiment_runner.experiment_runner import ExperimentRunner

Expand Down Expand Up @@ -156,17 +155,13 @@ def parse_ncpus(self, path: Path) -> int:
else:
return payu_config["ncpus"]

def parse_profiling_data(self, path: Path) -> dict[str, xr.Dataset]:
"""Parses profiling data from a Payu experiment directory.

def profiling_logs(self, path: Path) -> dict[str, ProfilingLog]:
"""Returns all profiling logs from the specified path.
Args:
path (Path): Path to the Payu experiment directory.
path (Path): Path to the experiment directory.
Returns:
dict[str, xr.Dataset]: Dictionary mapping component names to their profiling datasets.
Raises:
FileNotFoundError: If the archive or output directories are missing.
dict[str, ProfilingLog]: Dictionary of profiling logs.
"""
datasets = {}
logs = {}

# Check archive directory exists
Expand All @@ -189,10 +184,4 @@ def parse_profiling_data(self, path: Path) -> dict[str, xr.Dataset]:
logger.warning(f"Multiple output directories found in {path}! Using the first one found.")
logs.update(self.get_component_logs(matches[0]))

# Parse all logs
for name, log in logs.items():
logger.info(f"Parsing {name} profiling log: {log.filepath}. ")
datasets[name] = log.parse()
logger.info(" Done.")

return datasets
return logs
Loading
Loading