From def462ab7047deb124656eba533a3e6ac7f58036 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Thu, 2 Oct 2025 13:07:30 +0100 Subject: [PATCH 1/5] Add artifact snapshotting and tests --- simvue/api/objects/artifact/file.py | 19 ++++++++++ simvue/run.py | 4 ++ tests/functional/test_run_class.py | 24 +++++++++++- tests/unit/test_file_artifact.py | 57 +++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/simvue/api/objects/artifact/file.py b/simvue/api/objects/artifact/file.py index 664dadde..810c14db 100644 --- a/simvue/api/objects/artifact/file.py +++ b/simvue/api/objects/artifact/file.py @@ -4,6 +4,9 @@ import pydantic import os import pathlib +import shutil +from simvue.config.user import SimvueConfiguration +import uuid from simvue.models import NAME_REGEX from simvue.utilities import get_mimetype_for_file, get_mimetypes, calculate_sha256 @@ -39,6 +42,7 @@ def new( metadata: dict[str, typing.Any] | None, upload_timeout: int | None = None, offline: bool = False, + snapshot: bool = False, **kwargs, ) -> Self: """Create a new artifact either locally or on the server @@ -61,6 +65,8 @@ def new( specify the timeout in seconds for upload offline : bool, optional whether to define this artifact locally, default is False + snapshot : bool, optional + whether to create a snapshot of this file before uploading it, default is False """ _mime_type = mime_type or get_mimetype_for_file(file_path) @@ -73,6 +79,19 @@ def new( _file_checksum = kwargs.pop("checksum") else: file_path = pathlib.Path(file_path) + if snapshot: + _user_config = SimvueConfiguration.fetch() + + _local_staging_dir: pathlib.Path = _user_config.offline.cache.joinpath( + "artifacts" + ) + _local_staging_dir.mkdir(parents=True, exist_ok=True) + _local_staging_file = _local_staging_dir.joinpath( + f"{uuid.uuid4()}.file" + ) + shutil.copy(file_path, _local_staging_file) + file_path = _local_staging_file + _file_size = file_path.stat().st_size _file_orig_path = file_path.expanduser().absolute() _file_checksum = calculate_sha256(f"{file_path}", is_file=True) diff --git a/simvue/run.py b/simvue/run.py index c0904d78..6e37aff9 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -1632,6 +1632,7 @@ def save_file( category: typing.Literal["input", "output", "code"], file_type: str | None = None, preserve_path: bool = False, + snapshot: bool = False, name: typing.Optional[ typing.Annotated[str, pydantic.Field(pattern=NAME_REGEX)] ] = None, @@ -1652,6 +1653,8 @@ def save_file( the MIME file type else this is deduced, by default None preserve_path : bool, optional whether to preserve the path during storage, by default False + snapshot : bool, optional + whether to take a snapshot of the file before uploading, by default False name : str, optional name to associate with this file, by default None metadata : str | None, optional @@ -1686,6 +1689,7 @@ def save_file( offline=self._user_config.run.mode == "offline", mime_type=file_type, metadata=metadata, + snapshot=snapshot, ) _artifact.attach_to_run(self.id, category) except (ValueError, RuntimeError) as e: diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index dd82846a..4a0d54dc 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -809,6 +809,9 @@ def test_set_folder_details(request: pytest.FixtureRequest) -> None: @pytest.mark.run +@pytest.mark.parametrize( + "snapshot", (True, False) +) @pytest.mark.parametrize( "valid_mimetype,preserve_path,name,allow_pickle,empty_file,category", [ @@ -827,6 +830,7 @@ def test_save_file_online( allow_pickle: bool, empty_file: bool, category: typing.Literal["input", "output", "code"], + snapshot: bool, capfd, request, ) -> None: @@ -860,6 +864,7 @@ def test_save_file_online( file_type=file_type, preserve_path=preserve_path, name=name, + snapshot=snapshot ) else: with pytest.raises(RuntimeError): @@ -891,6 +896,9 @@ def test_save_file_online( @pytest.mark.run @pytest.mark.offline +@pytest.mark.parametrize( + "snapshot", (True, False) +) @pytest.mark.parametrize( "preserve_path,name,allow_pickle,empty_file,category", [ @@ -908,6 +916,7 @@ def test_save_file_offline( name: str | None, allow_pickle: bool, empty_file: bool, + snapshot: bool, category: typing.Literal["input", "output", "code"], capfd, ) -> None: @@ -927,7 +936,15 @@ def test_save_file_offline( file_type=file_type, preserve_path=preserve_path, name=name, + snapshot=snapshot ) + # if snapshotting, check file can be updated, but previous contents set + if snapshot: + with open( + (out_name := pathlib.Path(tempd).joinpath("test_file.txt")), + "w", + ) as out_f: + out_f.write("updated file!") sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) os.remove(out_name) client = sv_cl.Client() @@ -944,8 +961,11 @@ def test_save_file_offline( name=f"{name or stored_name}", output_dir=tempd, ) - assert out_loc.joinpath(name or out_name.name).exists() - + assert out_file.exists() + with open( + out_file, "r") as out_f: + content = out_f.read() + assert content == "test data entry" @pytest.mark.run def test_update_tags_running( diff --git a/tests/unit/test_file_artifact.py b/tests/unit/test_file_artifact.py index c442086c..b4b9bea6 100644 --- a/tests/unit/test_file_artifact.py +++ b/tests/unit/test_file_artifact.py @@ -93,4 +93,61 @@ def test_file_artifact_creation_offline(offline_cache_setup) -> None: assert _content == f"Hello World! {_uuid}" _run.delete() _folder.delete() + + +@pytest.mark.api +@pytest.mark.offline +@pytest.mark.parametrize( + "snapshot", + (True, False) +) +def test_file_artifact_creation_offline_snapshot(offline_cache_setup, snapshot) -> None: + _uuid: str = f"{uuid.uuid4()}".split("-")[0] + _folder_name = f"/simvue_unit_testing/{_uuid}" + _folder = Folder.new(path=_folder_name, offline=True) + _run = Run.new(name=f"test_file_artifact_creation_offline_snapshot_{_uuid}",folder=_folder_name, offline=True) + + _path = pathlib.Path(offline_cache_setup.name).joinpath("hello_world.txt") + + with _path.open("w") as out_f: + out_f.write(f"Hello World! {_uuid}") + + _folder.commit() + _run.commit() + _artifact = FileArtifact.new( + name=f"test_file_artifact_{_uuid}", + file_path=_path, + storage=None, + mime_type=None, + offline=True, + metadata=None, + snapshot=snapshot + ) + _artifact.attach_to_run(_run._identifier, category="input") + + with _artifact._local_staging_file.open() as in_f: + _local_data = json.load(in_f) + + assert _local_data.get("name") == f"test_file_artifact_{_uuid}" + assert _local_data.get("runs") == {_run._identifier: "input"} + + # Change the file after the artifact is created, but before it is sent + with _path.open("w") as out_f: + out_f.write("File changed!") + + if not snapshot: + with pytest.raises(RuntimeError): # TODO: Make sender be resilient to this? + _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10) + return + else: + _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10) + time.sleep(1) + + _online_artifact = Artifact(_id_mapping[_artifact.id]) + assert _online_artifact.name == _artifact.name + _content = b"".join(_online_artifact.download_content()).decode("UTF-8") + # Since it was snapshotted, should be the state of the file before it was changed + assert _content == f"Hello World! {_uuid}" + _run.delete() + _folder.delete() From 50a78f68e2a5e40e175c3ccdbaf94a92356a1038 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Thu, 2 Oct 2025 16:38:27 +0100 Subject: [PATCH 2/5] Add sender tests --- simvue/sender.py | 45 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/simvue/sender.py b/simvue/sender.py index 0989cdbc..8f234200 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -37,11 +37,20 @@ _logger = logging.getLogger(__name__) +def _log_upload_failed(file_path): + with open(file_path, "r") as file: + _data = json.load(file) + _data["upload_failed"] = True + with open(file_path, "w") as file: + json.dump(_data, file) + + def upload_cached_file( cache_dir: pydantic.DirectoryPath, obj_type: str, file_path: pydantic.FilePath, id_mapping: dict[str, str], + retry_failed_uploads: bool, lock: threading.Lock, ): """Upload data stored in a cached file to the Simvue server. @@ -62,10 +71,16 @@ def upload_cached_file( _current_id = file_path.name.split(".")[0] _data = json.load(file_path.open()) _exact_type: str = _data.pop("obj_type") + + if _data.pop("upload_failed", False) and not retry_failed_uploads: + return + try: _instance_class = getattr(simvue.api.objects, _exact_type) - except AttributeError as e: - raise RuntimeError(f"Attempt to initialise unknown type '{_exact_type}'") from e + except AttributeError: + _logger.error(f"Attempt to initialise unknown type '{_exact_type}'") + _log_upload_failed(file_path) + return # If it is an ObjectArtifact, need to load the object as bytes from a different file if issubclass(_instance_class, simvue.api.objects.ObjectArtifact): @@ -87,14 +102,21 @@ def upload_cached_file( if not issubclass(_instance_class, ArtifactBase): obj_for_upload.commit() _new_id = obj_for_upload.id - except RuntimeError as error: + except Exception as error: if "status 409" in error.args[0]: return - raise error + _logger.error( + f"Error while committing '{obj_for_upload.__class__.__name__}': {error.args[0]}" + ) + _log_upload_failed(file_path) + return if not _new_id: - raise RuntimeError( + _logger.error( f"Object of type '{obj_for_upload.__class__.__name__}' has no identifier" ) + _log_upload_failed(file_path) + return + _logger.info( f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {obj_for_upload.__class__.__name__} '{_new_id}'" ) @@ -155,6 +177,7 @@ def sender( max_workers: int = 5, threading_threshold: int = 10, objects_to_upload: list[str] = UPLOAD_ORDER, + retry_failed_uploads: bool = False, ) -> dict[str, str]: """Send data from a local cache directory to the Simvue server. @@ -168,6 +191,8 @@ def sender( The number of cached files above which threading will be used objects_to_upload : list[str] Types of objects to upload, by default uploads all types of objects present in cache + retry_failed_uploads : bool, optional + Whether to retry sending objects which previously failed, by default False Returns ------- @@ -203,7 +228,14 @@ def sender( _offline_files = _all_offline_files[_obj_type] if len(_offline_files) < threading_threshold: for file_path in _offline_files: - upload_cached_file(cache_dir, _obj_type, file_path, _id_mapping, _lock) + upload_cached_file( + cache_dir, + _obj_type, + file_path, + _id_mapping, + retry_failed_uploads, + _lock, + ) else: with ThreadPoolExecutor( max_workers=max_workers, thread_name_prefix="sender_session_upload" @@ -214,6 +246,7 @@ def sender( obj_type=_obj_type, file_path=file_path, id_mapping=_id_mapping, + retry_failed_uploads=retry_failed_uploads, lock=_lock, ), _offline_files, From e62c9f37037b739bc4675e417beeb9f1b84f8653 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Thu, 2 Oct 2025 17:17:13 +0100 Subject: [PATCH 3/5] Delete snapshot after upload --- simvue/api/objects/artifact/file.py | 4 ++++ simvue/sender.py | 31 ++++++++++++------------ tests/unit/test_file_artifact.py | 37 +++++++++++++++++++++++------ 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/simvue/api/objects/artifact/file.py b/simvue/api/objects/artifact/file.py index 810c14db..1264ce92 100644 --- a/simvue/api/objects/artifact/file.py +++ b/simvue/api/objects/artifact/file.py @@ -124,4 +124,8 @@ def new( with open(_file_orig_path, "rb") as out_f: _artifact._upload(file=out_f, timeout=upload_timeout, file_size=_file_size) + # If snapshot created, delete it after uploading + if pathlib.Path(_file_orig_path).parent == _artifact._local_staging_file.parent: + pathlib.Path(_file_orig_path).unlink() + return _artifact diff --git a/simvue/sender.py b/simvue/sender.py index 8f234200..06c623de 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -86,39 +86,38 @@ def upload_cached_file( if issubclass(_instance_class, simvue.api.objects.ObjectArtifact): with open(file_path.parent.joinpath(f"{_current_id}.object"), "rb") as file: _data["serialized"] = file.read() + try: + # We want to reconnect if there is an online ID stored for this file + if _online_id := id_mapping.get(_current_id): + obj_for_upload = _instance_class( + identifier=_online_id, _read_only=False, **_data + ) + else: + obj_for_upload = _instance_class.new(**_data) - # We want to reconnect if there is an online ID stored for this file - if _online_id := id_mapping.get(_current_id): - obj_for_upload = _instance_class( - identifier=_online_id, _read_only=False, **_data - ) - else: - obj_for_upload = _instance_class.new(**_data) - - with lock: - obj_for_upload.on_reconnect(id_mapping) + with lock: + obj_for_upload.on_reconnect(id_mapping) - try: if not issubclass(_instance_class, ArtifactBase): obj_for_upload.commit() _new_id = obj_for_upload.id + except Exception as error: if "status 409" in error.args[0]: return + _logger.error( - f"Error while committing '{obj_for_upload.__class__.__name__}': {error.args[0]}" + f"Error while committing '{_instance_class.__name__}': {error.args[0]}" ) _log_upload_failed(file_path) return if not _new_id: - _logger.error( - f"Object of type '{obj_for_upload.__class__.__name__}' has no identifier" - ) + _logger.error(f"Object of type '{_instance_class.__name__}' has no identifier") _log_upload_failed(file_path) return _logger.info( - f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {obj_for_upload.__class__.__name__} '{_new_id}'" + f"{'Updated' if id_mapping.get(_current_id) else 'Created'} {_instance_class.__name__} '{_new_id}'" ) file_path.unlink(missing_ok=True) diff --git a/tests/unit/test_file_artifact.py b/tests/unit/test_file_artifact.py index b4b9bea6..e59867b9 100644 --- a/tests/unit/test_file_artifact.py +++ b/tests/unit/test_file_artifact.py @@ -10,10 +10,15 @@ from simvue.api.objects.folder import Folder from simvue.sender import sender from simvue.client import Client +import logging @pytest.mark.api @pytest.mark.online -def test_file_artifact_creation_online() -> None: +@pytest.mark.parametrize( + "snapshot", + (True, False) +) +def test_file_artifact_creation_online(offline_cache_setup, snapshot) -> None: _uuid: str = f"{uuid.uuid4()}".split("-")[0] _folder_name = f"/simvue_unit_testing/{_uuid}" _folder = Folder.new(path=_folder_name) @@ -32,7 +37,8 @@ def test_file_artifact_creation_online() -> None: file_path=_path, storage=None, mime_type=None, - metadata=None + metadata=None, + snapshot=snapshot ) _artifact.attach_to_run(_run.id, "input") time.sleep(1) @@ -45,6 +51,11 @@ def test_file_artifact_creation_online() -> None: _content = b"".join(_artifact.download_content()).decode("UTF-8") assert _content == f"Hello World! {_uuid}" assert _artifact.to_dict() + + # If snapshotting, check no local copy remains + if snapshot: + assert len(list(_artifact._local_staging_file.parent.iterdir())) == 0 + _run.delete() _folder.delete(recursive=True, delete_runs=True, runs_only=False) with contextlib.suppress(FileNotFoundError): @@ -55,7 +66,11 @@ def test_file_artifact_creation_online() -> None: @pytest.mark.api @pytest.mark.offline -def test_file_artifact_creation_offline(offline_cache_setup) -> None: +@pytest.mark.parametrize( + "snapshot", + (True, False) +) +def test_file_artifact_creation_offline(offline_cache_setup, snapshot) -> None: _uuid: str = f"{uuid.uuid4()}".split("-")[0] _folder_name = f"/simvue_unit_testing/{_uuid}" _folder = Folder.new(path=_folder_name, offline=True) @@ -74,7 +89,8 @@ def test_file_artifact_creation_offline(offline_cache_setup) -> None: storage=None, mime_type=None, offline=True, - metadata=None + metadata=None, + snapshot=snapshot ) _artifact.attach_to_run(_run._identifier, category="input") @@ -84,9 +100,15 @@ def test_file_artifact_creation_offline(offline_cache_setup) -> None: assert _local_data.get("name") == f"test_file_artifact_{_uuid}" assert _local_data.get("runs") == {_run._identifier: "input"} + # If snapshot, check artifact definition file and a copy of the actual file exist in staging area + assert len(list(_artifact._local_staging_file.parent.iterdir())) == 2 if snapshot else 1 + _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10) time.sleep(1) + # Check file(s) deleted after upload + assert len(list(_artifact._local_staging_file.parent.iterdir())) == 0 + _online_artifact = Artifact(_id_mapping[_artifact.id]) assert _online_artifact.name == _artifact.name _content = b"".join(_online_artifact.download_content()).decode("UTF-8") @@ -101,11 +123,11 @@ def test_file_artifact_creation_offline(offline_cache_setup) -> None: "snapshot", (True, False) ) -def test_file_artifact_creation_offline_snapshot(offline_cache_setup, snapshot) -> None: +def test_file_artifact_creation_offline_updated(offline_cache_setup, caplog, snapshot) -> None: _uuid: str = f"{uuid.uuid4()}".split("-")[0] _folder_name = f"/simvue_unit_testing/{_uuid}" _folder = Folder.new(path=_folder_name, offline=True) - _run = Run.new(name=f"test_file_artifact_creation_offline_snapshot_{_uuid}",folder=_folder_name, offline=True) + _run = Run.new(name=f"test_file_artifact_creation_offline_updated_{_uuid}",folder=_folder_name, offline=True) _path = pathlib.Path(offline_cache_setup.name).joinpath("hello_world.txt") @@ -136,8 +158,9 @@ def test_file_artifact_creation_offline_snapshot(offline_cache_setup, snapshot) out_f.write("File changed!") if not snapshot: - with pytest.raises(RuntimeError): # TODO: Make sender be resilient to this? + with caplog.at_level(logging.ERROR): _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10) + assert "The SHA256 you specified did not match the calculated checksum." in caplog.text return else: _id_mapping = sender(pathlib.Path(offline_cache_setup.name), 1, 10) From 54e0dab4d99fe77311cf9c23a471fbf321647471 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Thu, 2 Oct 2025 17:39:11 +0100 Subject: [PATCH 4/5] Add sender tests --- tests/unit/test_sender.py | 185 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 tests/unit/test_sender.py diff --git a/tests/unit/test_sender.py b/tests/unit/test_sender.py new file mode 100644 index 00000000..0c8555a0 --- /dev/null +++ b/tests/unit/test_sender.py @@ -0,0 +1,185 @@ +import contextlib +import json +import pytest +import time +import datetime +import uuid +from simvue.api.objects.run import RunBatchArgs +from simvue.sender import sender +from simvue.api.objects import Run, Metrics, Folder +from simvue.client import Client +from simvue.models import DATETIME_FORMAT +import logging +import pathlib +import requests + +@pytest.mark.parametrize("retry_failed_uploads", (True, False)) +@pytest.mark.parametrize("parallel", (True, False)) + +@pytest.mark.offline +def test_sender_exception_handling(offline_cache_setup, caplog, retry_failed_uploads, parallel): + # Create something which will produce an error when sent, eg a metric with invalid run ID + for i in range(5): + _metrics = Metrics.new( + run="invalid_run_id", + metrics=[ + { + "timestamp": datetime.datetime.now().strftime(DATETIME_FORMAT), + "time": 1, + "step": 1, + "values": {"x": 1, "y": 2}, + } + ], + offline=True + ) + _metrics.commit() + + with caplog.at_level(logging.ERROR): + sender(threading_threshold=1 if parallel else 10) + + assert "Error while committing 'Metrics'" in caplog.text + + # Wait, then try sending again + time.sleep(1) + caplog.clear() + + with caplog.at_level(logging.ERROR): + sender(retry_failed_uploads=retry_failed_uploads, threading_threshold=1 if parallel else 10) + + if retry_failed_uploads: + assert "Error while committing 'Metrics'" in caplog.text + else: + assert not caplog.text + + # Check files not deleted + _offline_metric_paths = list(pathlib.Path(offline_cache_setup.name).joinpath("metrics").iterdir()) + assert len(_offline_metric_paths) == 5 + # Check files have 'upload_failed: True' + for _metric_path in _offline_metric_paths: + with open(_metric_path, "r") as _file: + _metric = json.load(_file) + assert _metric.get("upload_failed") == True + +@pytest.mark.parametrize("parallel", (True, False)) +def test_sender_server_ids(offline_cache_setup, caplog, parallel): + # Create an offline run + _uuid: str = f"{uuid.uuid4()}".split("-")[0] + _path = f"/simvue_unit_testing/objects/folder/{_uuid}" + _folder = Folder.new(path=_path, offline=True) + _folder.commit() + + _offline_run_ids = [] + + for i in range(5): + _name = f"test_sender_server_ids-{_uuid}-{i}" + _run = Run.new(name=_name, folder=_path, offline=True) + _run.commit() + + _offline_run_ids.append(_run.id) + + # Create metric associated with offline run ID + _metrics = Metrics.new( + run=_run.id, + metrics=[ + { + "timestamp": datetime.datetime.now().strftime(DATETIME_FORMAT), + "time": 1, + "step": 1, + "values": {"x": i}, + } + ], + offline=True + ) + _metrics.commit() + + # Send both items + with caplog.at_level(logging.ERROR): + sender(threading_threshold=1 if parallel else 10) + + assert not caplog.text + + # Check server ID mapping correctly created + _online_runs = [] + for i, _offline_run_id in enumerate(_offline_run_ids): + _id_file = pathlib.Path(offline_cache_setup.name).joinpath("server_ids", f"{_offline_run_id}.txt") + assert _id_file.exists() + _online_id = _id_file.read_text() + + # Check correct ID is contained within file + _online_run = Run(identifier=_online_id) + _online_runs.append(_online_run) + assert _online_run.name == f"test_sender_server_ids-{_uuid}-{i}" + + # Check metric has been associated with correct online run + _run_metric = next(_online_run.metrics) + assert _run_metric[0] == 'x' + assert _run_metric[1]["count"] == 1 + assert _run_metric[1]["min"] == i + + # Create a new offline metric with offline run ID + _metrics = Metrics.new( + run=_offline_run_id, + metrics=[ + { + "timestamp": datetime.datetime.now().strftime(DATETIME_FORMAT), + "time": 2, + "step": 2, + "values": {"x": 2}, + } + ], + offline=True + ) + _metrics.commit() + + # Run sender again, check online ID is correctly loaded from file and substituted for offline ID + with caplog.at_level(logging.ERROR): + sender(threading_threshold=1 if parallel else 10) + + assert not caplog.text + + # Check metric uploaded correctly + for _online_run in _online_runs: + _online_run.refresh() + _run_metric = next(_online_run.metrics) + assert _run_metric[0] == 'x' + assert _run_metric[1]["count"] == 2 + + # Check all files for runs and metrics deleted once they were processed + assert len(list(pathlib.Path(offline_cache_setup.name).joinpath("runs").iterdir())) == 0 + assert len(list(pathlib.Path(offline_cache_setup.name).joinpath("metrics").iterdir())) == 0 + +@pytest.mark.parametrize("parallel", (True, False)) +def test_send_heartbeat(offline_cache_setup, parallel, mocker): + # Create an offline run + _uuid: str = f"{uuid.uuid4()}".split("-")[0] + _path = f"/simvue_unit_testing/objects/folder/{_uuid}" + _folder = Folder.new(path=_path, offline=True) + _folder.commit() + + _offline_runs = [] + + for i in range(5): + _name = f"test_sender_server_ids-{_uuid}-{i}" + _run = Run.new(name=_name, folder=_path, offline=True, heartbeat_timeout=1, status="running") + _run.commit() + + _offline_runs.append(_run) + + _id_mapping = sender(threading_threshold=1 if parallel else 10) + _online_runs = [Run(identifier=_id_mapping.get(_offline_run.id)) for _offline_run in _offline_runs] + assert all([_online_run.status == "running" for _online_run in _online_runs]) + + spy_put = mocker.spy(requests, "put") + + # Create heartbeat and send every 0.5s for 5s + for i in range(10): + time.sleep(0.5) + [_offline_run.send_heartbeat() for _offline_run in _offline_runs] + sender(threading_threshold=1 if parallel else 10) + + # Check requests.put() endpoint called 50 times - once for each of the 5 runs, on all 10 iterations + assert spy_put.call_count == 50 + + # Get online runs and check all running + [_online_run.refresh() for _online_run in _online_runs] + assert all([_online_run.status == "running" for _online_run in _online_runs]) \ No newline at end of file From cd21ba6cf3b229065ae568d1bf67e9b164e8f810 Mon Sep 17 00:00:00 2001 From: Matt Field Date: Fri, 3 Oct 2025 10:39:39 +0100 Subject: [PATCH 5/5] Address PR comments --- simvue/api/objects/artifact/file.py | 4 ++-- simvue/sender.py | 28 +++++++++++++++++----------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/simvue/api/objects/artifact/file.py b/simvue/api/objects/artifact/file.py index 1264ce92..5cc41465 100644 --- a/simvue/api/objects/artifact/file.py +++ b/simvue/api/objects/artifact/file.py @@ -6,7 +6,7 @@ import pathlib import shutil from simvue.config.user import SimvueConfiguration -import uuid +from datetime import datetime from simvue.models import NAME_REGEX from simvue.utilities import get_mimetype_for_file, get_mimetypes, calculate_sha256 @@ -87,7 +87,7 @@ def new( ) _local_staging_dir.mkdir(parents=True, exist_ok=True) _local_staging_file = _local_staging_dir.joinpath( - f"{uuid.uuid4()}.file" + f"{file_path.stem}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S_%f')[:-3]}.file" ) shutil.copy(file_path, _local_staging_file) file_path = _local_staging_file diff --git a/simvue/sender.py b/simvue/sender.py index 06c623de..ee8f2913 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -13,7 +13,6 @@ import requests import psutil from simvue.config.user import SimvueConfiguration - import simvue.api.objects from simvue.api.objects.artifact.base import ArtifactBase from simvue.eco.emissions_monitor import CO2Monitor @@ -37,11 +36,18 @@ _logger = logging.getLogger(__name__) -def _log_upload_failed(file_path): - with open(file_path, "r") as file: +def _log_upload_failed(file_path: pydantic.FilePath) -> None: + """Record that an object failed to upload in the object offline cache file. + + Parameters + ---------- + file_path : pydantic.FilePath + The path to the offline cache file for the object + """ + with file_path.open("r") as file: _data = json.load(file) _data["upload_failed"] = True - with open(file_path, "w") as file: + with file_path.open("w") as file: json.dump(_data, file) @@ -52,7 +58,7 @@ def upload_cached_file( id_mapping: dict[str, str], retry_failed_uploads: bool, lock: threading.Lock, -): +) -> None: """Upload data stored in a cached file to the Simvue server. Parameters @@ -228,12 +234,12 @@ def sender( if len(_offline_files) < threading_threshold: for file_path in _offline_files: upload_cached_file( - cache_dir, - _obj_type, - file_path, - _id_mapping, - retry_failed_uploads, - _lock, + cache_dir=cache_dir, + obj_type=_obj_type, + file_path=file_path, + id_mapping=_id_mapping, + retry_failed_uploads=retry_failed_uploads, + lock=_lock, ) else: with ThreadPoolExecutor(