From d1a26199d0148fb38fc7d266852be373f95c0e0b Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Thu, 14 Aug 2025 16:13:13 -0400 Subject: [PATCH 01/22] fix post-processing of item_id for s1grd; add unit test --- datasets/sentinel-1-grd/s1grd.py | 3 -- datasets/sentinel-1-grd/test_s1grd.py | 65 ++++++++++++++++++++++++--- 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/datasets/sentinel-1-grd/s1grd.py b/datasets/sentinel-1-grd/s1grd.py index 1ff95e6a..f48f6adb 100644 --- a/datasets/sentinel-1-grd/s1grd.py +++ b/datasets/sentinel-1-grd/s1grd.py @@ -152,9 +152,6 @@ def create_item( logger.error(f"Unexpected error processing {archive}: {str(e)}") return [] - # Remove checksum from id - item.id = "_".join(item.id.split("_")[0:-1]) - # Remove providers item.properties.pop("providers", None) diff --git a/datasets/sentinel-1-grd/test_s1grd.py b/datasets/sentinel-1-grd/test_s1grd.py index 48eb6bfc..a56c49a3 100644 --- a/datasets/sentinel-1-grd/test_s1grd.py +++ b/datasets/sentinel-1-grd/test_s1grd.py @@ -1,3 +1,4 @@ +import datetime import pathlib import pystac @@ -5,6 +6,7 @@ import s1grd from stactools.sentinel1.metadata_links import MetadataLinks from pctasks.core.storage import StorageFactory +from unittest import mock import pytest @@ -36,7 +38,6 @@ def test_metadata_links_annotation_pattern_parametrized( tmp_path, item_id: str, annotation_name: str, expected_key: str ): - # Setup: create a minimal manifest.safe with dataObjectSection and fileLocation archive_dir = tmp_path / item_id annotation_filename = f"{annotation_name}.xml" annotation_dir = archive_dir / "annotation" @@ -76,10 +77,8 @@ def test_get_item_storage(): asset_uri = "blob://sentinel1euwest/s1-grd/GRD/2023/6/20/EW/DH/S1A_EW_GRDM_1SDH_20230620T020009_20230620T020113_049063_05E665_5673/manifest.safe" # noqa: E501 storage_factory = StorageFactory() storage, path = s1grd.get_item_storage(asset_uri, storage_factory=storage_factory) - assert ( - path - == "GRD/2023/6/20/EW/DH/S1A_EW_GRDM_1SDH_20230620T020009_20230620T020113_049063_05E665.json" - ) # noqa: E501 + expected_path = "GRD/2023/6/20/EW/DH/S1A_EW_GRDM_1SDH_20230620T020009_20230620T020113_049063_05E665.json" # noqa: E501 + assert path == expected_path assert storage.root_uri == "blob://sentinel1euwest/s1-grd-stac" @@ -111,3 +110,59 @@ def test_rewrite_asset_hrefs(): "vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/measurement/iw-vv.tiff", } assert result == expected + + +@mock.patch("s1grd.create_item") +def test_s1grd_create_item_id_handling(mock_create_item): # noqa: E501 + # Setup test data + item_id_with_checksum = ( + "S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1" + ) + item_id_without_checksum = ( + "S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D" + ) + asset_uri = f"blob://sentinel1euwest/s1-grd/GRD/2023/6/28/IW/DV/{item_id_with_checksum}/manifest.safe" + + mock_item = pystac.Item( + id=item_id_without_checksum, + geometry={ + "type": "Polygon", + "coordinates": [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]], + }, + bbox=[0, 0, 1, 1], + datetime=datetime.datetime.now(), + properties={}, + ) + mock_item.assets = {} + mock_create_item.return_value = mock_item + + storage_factory = mock.MagicMock() + archive_storage = mock.MagicMock() + stac_item_storage = mock.MagicMock() + + storage_factory.get_storage.side_effect = [archive_storage, stac_item_storage] + archive_storage.list_files.return_value = [] + stac_item_storage.file_exists.return_value = False + + with ( + mock.patch("tempfile.TemporaryDirectory") as mock_temp_dir, + mock.patch("os.mkdir"), + mock.patch("os.path.isdir", return_value=False), + mock.patch("os.makedirs"), + mock.patch("s1grd.with_backoff"), + mock.patch("s1grd.fix_item", return_value=mock_item), + ): + # Configure temporary directory + mock_temp_dir.return_value.__enter__.return_value = "/tmp/mockdir" + + # Call function under test + result = s1grd.S1GRDCollection.create_item(asset_uri, storage_factory) + + mock_create_item.assert_called_once() + args, _ = mock_create_item.call_args + temp_dir_path = args[0] + assert item_id_with_checksum in temp_dir_path + + assert isinstance(result, list) + assert len(result) == 1 + assert result[0].id == item_id_without_checksum From d69ef6cb4ab69cfcba2b3152a8828aea93165ac7 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Thu, 14 Aug 2025 16:28:35 -0400 Subject: [PATCH 02/22] format; test updates --- datasets/sentinel-1-grd/s1grd.py | 4 +- datasets/sentinel-1-grd/test_s1grd.py | 69 +++++++++++++-------------- 2 files changed, 35 insertions(+), 38 deletions(-) diff --git a/datasets/sentinel-1-grd/s1grd.py b/datasets/sentinel-1-grd/s1grd.py index f48f6adb..d9e4957a 100644 --- a/datasets/sentinel-1-grd/s1grd.py +++ b/datasets/sentinel-1-grd/s1grd.py @@ -8,12 +8,12 @@ import requests import urllib3 from stactools.core.utils.antimeridian import Strategy, fix_item -from pctasks.core.storage.base import Storage from stactools.sentinel1.formats import Format from stactools.sentinel1.grd.stac import create_item from pctasks.core.models.task import WaitTaskResult from pctasks.core.storage import StorageFactory +from pctasks.core.storage.base import Storage from pctasks.core.utils.backoff import is_common_throttle_exception, with_backoff from pctasks.dataset.collection import Collection @@ -209,6 +209,6 @@ def rewrite_asset_hrefs( for asset in item.assets.values(): path = pathlib.Path(asset.href).relative_to(relative_to) - asset.href = storage.get_url(path) + asset.href = storage.get_url(path) # type: ignore return item diff --git a/datasets/sentinel-1-grd/test_s1grd.py b/datasets/sentinel-1-grd/test_s1grd.py index a56c49a3..4b5feeee 100644 --- a/datasets/sentinel-1-grd/test_s1grd.py +++ b/datasets/sentinel-1-grd/test_s1grd.py @@ -1,14 +1,15 @@ import datetime +import logging import pathlib -import pystac +from pathlib import Path +from unittest.mock import MagicMock, Mock, patch -import logging +import pystac +import pytest import s1grd from stactools.sentinel1.metadata_links import MetadataLinks -from pctasks.core.storage import StorageFactory -from unittest import mock -import pytest +from pctasks.core.storage import StorageFactory HERE = pathlib.Path(__file__).parent logging.basicConfig(level=logging.INFO) @@ -36,8 +37,8 @@ ], ) def test_metadata_links_annotation_pattern_parametrized( - tmp_path, item_id: str, annotation_name: str, expected_key: str -): + tmp_path: Path, item_id: str, annotation_name: str, expected_key: str +) -> None: archive_dir = tmp_path / item_id annotation_filename = f"{annotation_name}.xml" annotation_dir = archive_dir / "annotation" @@ -73,16 +74,15 @@ def test_metadata_links_annotation_pattern_parametrized( ) -def test_get_item_storage(): +def test_get_item_storage() -> None: asset_uri = "blob://sentinel1euwest/s1-grd/GRD/2023/6/20/EW/DH/S1A_EW_GRDM_1SDH_20230620T020009_20230620T020113_049063_05E665_5673/manifest.safe" # noqa: E501 storage_factory = StorageFactory() storage, path = s1grd.get_item_storage(asset_uri, storage_factory=storage_factory) expected_path = "GRD/2023/6/20/EW/DH/S1A_EW_GRDM_1SDH_20230620T020009_20230620T020113_049063_05E665.json" # noqa: E501 assert path == expected_path - assert storage.root_uri == "blob://sentinel1euwest/s1-grd-stac" -def test_rewrite_asset_hrefs(): +def test_rewrite_asset_hrefs() -> None: archive_storage = StorageFactory().get_storage( "blob://sentinel1euwest/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1" # noqa: E501 ) @@ -98,30 +98,29 @@ def test_rewrite_asset_hrefs(): } expected = { - "safe-manifest": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/manifest.safe", - "schema-product-vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/rfi-iw-vh.xml", - "schema-product-vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/rfi/rfi-iw-vv.xml", - "schema-calibration-vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/calibration/calibration-iw-vh.xml", - "schema-calibration-vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/calibration/calibration-iw-vv.xml", - "schema-noise-vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/calibration/noise-iw-vh.xml", - "schema-noise-vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/calibration/noise-iw-vv.xml", - "thumbnail": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/preview/quick-look.png", - "vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/measurement/iw-vh.tiff", - "vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/measurement/iw-vv.tiff", - } + "safe-manifest": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/manifest.safe", # noqa: E501 + "schema-product-vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/rfi-iw-vh.xml", # noqa: E501 + "schema-product-vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/rfi/rfi-iw-vv.xml", # noqa: E501 + "schema-calibration-vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/calibration/calibration-iw-vh.xml", # noqa: E501 + "schema-calibration-vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/calibration/calibration-iw-vv.xml", # noqa: E501 + "schema-noise-vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/calibration/noise-iw-vh.xml", # noqa: E501 + "schema-noise-vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/annotation/calibration/noise-iw-vv.xml", # noqa: E501 + "thumbnail": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/preview/quick-look.png", # noqa: E501 + "vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/measurement/iw-vh.tiff", # noqa: E501 + "vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/measurement/iw-vv.tiff", # noqa: E501 + } # noqa: E501 assert result == expected -@mock.patch("s1grd.create_item") -def test_s1grd_create_item_id_handling(mock_create_item): # noqa: E501 - # Setup test data +@patch("s1grd.create_item") +def test_s1grd_create_item_id_handling(mock_create_item: Mock) -> None: item_id_with_checksum = ( "S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1" ) item_id_without_checksum = ( "S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D" ) - asset_uri = f"blob://sentinel1euwest/s1-grd/GRD/2023/6/28/IW/DV/{item_id_with_checksum}/manifest.safe" + asset_uri = f"blob://sentinel1euwest/s1-grd/GRD/2023/6/28/IW/DV/{item_id_with_checksum}/manifest.safe" # noqa: E501 mock_item = pystac.Item( id=item_id_without_checksum, @@ -136,26 +135,24 @@ def test_s1grd_create_item_id_handling(mock_create_item): # noqa: E501 mock_item.assets = {} mock_create_item.return_value = mock_item - storage_factory = mock.MagicMock() - archive_storage = mock.MagicMock() - stac_item_storage = mock.MagicMock() + storage_factory = MagicMock() + archive_storage = MagicMock() + stac_item_storage = MagicMock() storage_factory.get_storage.side_effect = [archive_storage, stac_item_storage] archive_storage.list_files.return_value = [] stac_item_storage.file_exists.return_value = False with ( - mock.patch("tempfile.TemporaryDirectory") as mock_temp_dir, - mock.patch("os.mkdir"), - mock.patch("os.path.isdir", return_value=False), - mock.patch("os.makedirs"), - mock.patch("s1grd.with_backoff"), - mock.patch("s1grd.fix_item", return_value=mock_item), + patch("tempfile.TemporaryDirectory") as mock_temp_dir, + patch("os.mkdir"), + patch("os.path.isdir", return_value=False), + patch("os.makedirs"), + patch("s1grd.with_backoff"), + patch("s1grd.fix_item", return_value=mock_item), ): - # Configure temporary directory mock_temp_dir.return_value.__enter__.return_value = "/tmp/mockdir" - # Call function under test result = s1grd.S1GRDCollection.create_item(asset_uri, storage_factory) mock_create_item.assert_called_once() From adcc640e92a9fbcb3aabba95c82d99f61bc50d59 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Thu, 14 Aug 2025 16:29:21 -0400 Subject: [PATCH 03/22] format; test updates --- datasets/sentinel-1-grd/test_s1grd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/sentinel-1-grd/test_s1grd.py b/datasets/sentinel-1-grd/test_s1grd.py index 4b5feeee..e9be3618 100644 --- a/datasets/sentinel-1-grd/test_s1grd.py +++ b/datasets/sentinel-1-grd/test_s1grd.py @@ -108,7 +108,7 @@ def test_rewrite_asset_hrefs() -> None: "thumbnail": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/preview/quick-look.png", # noqa: E501 "vh": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/measurement/iw-vh.tiff", # noqa: E501 "vv": "https://sentinel1euwest.blob.core.windows.net/s1-grd/GRD/2023/6/28/IW/DV/S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1/measurement/iw-vv.tiff", # noqa: E501 - } # noqa: E501 + } assert result == expected From 3a5448510684aaacf1935de7b99349896bacc37d Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Thu, 14 Aug 2025 16:34:07 -0400 Subject: [PATCH 04/22] format; test updates --- datasets/sentinel-1-grd/test_s1grd.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datasets/sentinel-1-grd/test_s1grd.py b/datasets/sentinel-1-grd/test_s1grd.py index e9be3618..542e2b96 100644 --- a/datasets/sentinel-1-grd/test_s1grd.py +++ b/datasets/sentinel-1-grd/test_s1grd.py @@ -163,3 +163,8 @@ def test_s1grd_create_item_id_handling(mock_create_item: Mock) -> None: assert isinstance(result, list) assert len(result) == 1 assert result[0].id == item_id_without_checksum + + archive_storage.list_files.assert_called_once() + stac_item_storage.file_exists.assert_called_once_with( + f"GRD/2023/6/28/IW/DV/{item_id_without_checksum}.json" + ) From ba45006da0e478fabbe98e257107a892e8783ce4 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Thu, 14 Aug 2025 16:53:17 -0400 Subject: [PATCH 05/22] fix unit test to ensure it can catch when logic issues pertaining to item_id occur --- datasets/sentinel-1-grd/test_s1grd.py | 60 +++++++++++++-------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/datasets/sentinel-1-grd/test_s1grd.py b/datasets/sentinel-1-grd/test_s1grd.py index 542e2b96..bc4c2241 100644 --- a/datasets/sentinel-1-grd/test_s1grd.py +++ b/datasets/sentinel-1-grd/test_s1grd.py @@ -112,8 +112,20 @@ def test_rewrite_asset_hrefs() -> None: assert result == expected +@patch("s1grd.StorageFactory") +@patch("s1grd.get_item_storage") +@patch("s1grd.with_backoff") @patch("s1grd.create_item") -def test_s1grd_create_item_id_handling(mock_create_item: Mock) -> None: +def test_s1grd_create_item_id_handling( + mock_create_item: Mock, + mock_with_backoff: Mock, + mock_get_item_storage: Mock, + mock_storage_factory: Mock, +) -> None: + """ + Test that S1GRDCollection.create_item correctly handles the item_id + and does not incorrectly modify it during post-processing. + """ item_id_with_checksum = ( "S1A_IW_GRDH_1SDV_20230628T210705_20230628T210730_049191_05EA4D_21D1" ) @@ -122,6 +134,15 @@ def test_s1grd_create_item_id_handling(mock_create_item: Mock) -> None: ) asset_uri = f"blob://sentinel1euwest/s1-grd/GRD/2023/6/28/IW/DV/{item_id_with_checksum}/manifest.safe" # noqa: E501 + # Mock the storage and file interactions + mock_storage = MagicMock() + mock_get_item_storage.return_value = ( + mock_storage, + f"{item_id_without_checksum}.json", + ) + mock_storage_factory.get_storage.return_value = mock_storage + mock_storage.list_files.return_value = [] + mock_item = pystac.Item( id=item_id_without_checksum, geometry={ @@ -135,36 +156,15 @@ def test_s1grd_create_item_id_handling(mock_create_item: Mock) -> None: mock_item.assets = {} mock_create_item.return_value = mock_item - storage_factory = MagicMock() - archive_storage = MagicMock() - stac_item_storage = MagicMock() - - storage_factory.get_storage.side_effect = [archive_storage, stac_item_storage] - archive_storage.list_files.return_value = [] - stac_item_storage.file_exists.return_value = False - - with ( - patch("tempfile.TemporaryDirectory") as mock_temp_dir, - patch("os.mkdir"), - patch("os.path.isdir", return_value=False), - patch("os.makedirs"), - patch("s1grd.with_backoff"), - patch("s1grd.fix_item", return_value=mock_item), - ): - mock_temp_dir.return_value.__enter__.return_value = "/tmp/mockdir" - - result = s1grd.S1GRDCollection.create_item(asset_uri, storage_factory) - - mock_create_item.assert_called_once() - args, _ = mock_create_item.call_args - temp_dir_path = args[0] - assert item_id_with_checksum in temp_dir_path + result = s1grd.S1GRDCollection.create_item(asset_uri, mock_storage_factory) + # Validate the result assert isinstance(result, list) assert len(result) == 1 - assert result[0].id == item_id_without_checksum + assert ( + result[0].id == item_id_without_checksum + ), f"Expected item_id to be '{item_id_without_checksum}', but got '{result[0].id}'" - archive_storage.list_files.assert_called_once() - stac_item_storage.file_exists.assert_called_once_with( - f"GRD/2023/6/28/IW/DV/{item_id_without_checksum}.json" - ) + # Validate that the item was not incorrectly modified + mock_create_item.assert_called_once() + mock_storage.list_files.assert_called_once() From d9a119be6caf7e9ea94df8971a8503e07a2573d8 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Thu, 14 Aug 2025 17:01:10 -0400 Subject: [PATCH 06/22] fix unit test to ensure it can catch when logic issues pertaining to item_id occur --- datasets/sentinel-1-grd/test_s1grd.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/datasets/sentinel-1-grd/test_s1grd.py b/datasets/sentinel-1-grd/test_s1grd.py index bc4c2241..3d9963db 100644 --- a/datasets/sentinel-1-grd/test_s1grd.py +++ b/datasets/sentinel-1-grd/test_s1grd.py @@ -134,7 +134,6 @@ def test_s1grd_create_item_id_handling( ) asset_uri = f"blob://sentinel1euwest/s1-grd/GRD/2023/6/28/IW/DV/{item_id_with_checksum}/manifest.safe" # noqa: E501 - # Mock the storage and file interactions mock_storage = MagicMock() mock_get_item_storage.return_value = ( mock_storage, @@ -158,13 +157,11 @@ def test_s1grd_create_item_id_handling( result = s1grd.S1GRDCollection.create_item(asset_uri, mock_storage_factory) - # Validate the result assert isinstance(result, list) assert len(result) == 1 - assert ( - result[0].id == item_id_without_checksum - ), f"Expected item_id to be '{item_id_without_checksum}', but got '{result[0].id}'" + assert result[0].id == item_id_without_checksum, ( + f"Expected item_id to be '{item_id_without_checksum}', but got '{result[0].id}'" + ) - # Validate that the item was not incorrectly modified mock_create_item.assert_called_once() mock_storage.list_files.assert_called_once() From c65f1f1291c662a73617237c3427d53e6c7cca4a Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 19 Aug 2025 13:37:08 -0400 Subject: [PATCH 07/22] test dedupe; test s3 docker --- datasets/sentinel-1-grd/dataset.yaml | 2 +- datasets/sentinel-3/Dockerfile | 12 +- datasets/sentinel-3/dataset.yaml | 2 +- .../ingest_task/pctasks/ingest_task/pgstac.py | 9 +- pctasks/ingest_task/tests/test_items.py | 152 ++++++++++++++++-- 5 files changed, 157 insertions(+), 20 deletions(-) diff --git a/datasets/sentinel-1-grd/dataset.yaml b/datasets/sentinel-1-grd/dataset.yaml index f3365181..35e232dc 100644 --- a/datasets/sentinel-1-grd/dataset.yaml +++ b/datasets/sentinel-1-grd/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel-1-grd -image: ${{ args.registry }}/pctasks-sentinel-1-grd:20250708.1 +image: ${{ args.registry }}/pctasks-sentinel-1-grd:20250814.1 args: - registry diff --git a/datasets/sentinel-3/Dockerfile b/datasets/sentinel-3/Dockerfile index 7bdffadf..eb5fad36 100644 --- a/datasets/sentinel-3/Dockerfile +++ b/datasets/sentinel-3/Dockerfile @@ -21,15 +21,15 @@ RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10 # See https://github.com/mapbox/rasterio/issues/1289 ENV CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt -# Install Python 3.8 -RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Mambaforge-$(uname)-$(uname -m).sh" \ - && bash "Mambaforge-$(uname)-$(uname -m).sh" -b -p /opt/conda \ - && rm -rf "Mambaforge-$(uname)-$(uname -m).sh" +# Install Python 3.10 +RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" \ + && bash "Miniforge3-$(uname)-$(uname -m).sh" -b -p /opt/conda \ + && rm -rf "Miniforge3-$(uname)-$(uname -m).sh" ENV PATH /opt/conda/bin:$PATH ENV LD_LIBRARY_PATH /opt/conda/lib/:$LD_LIBRARY_PATH -RUN mamba install -y -c conda-forge python=3.8 gdal=3.3.3 pip setuptools cython numpy==1.21.5 +RUN mamba install -y -c conda-forge python=3.10 gdal libgdal-netcdf pip setuptools cython numpy RUN python -m pip install --upgrade pip @@ -66,7 +66,7 @@ RUN cd /opt/src/pctasks/dataset && \ pip install . COPY ./datasets/sentinel-3/requirements.txt /opt/src/datasets/sentinel-3/requirements.txt -RUN python3 -m pip install -r /opt/src/datasets/sentinel-3/requirements.txt +RUN python3 -m pip install -r /opt/src/datasets/sentinel-3/requirements.txt # Setup Python Path to allow import of test modules ENV PYTHONPATH=/opt/src:$PYTHONPATH diff --git a/datasets/sentinel-3/dataset.yaml b/datasets/sentinel-3/dataset.yaml index 7261428e..5dfeaeed 100644 --- a/datasets/sentinel-3/dataset.yaml +++ b/datasets/sentinel-3/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel-3 -image: ${{ args.registry }}/pctasks-sentinel-3:20230630.1 +image: ${{ args.registry }}/pctasks-sentinel-3:20250819.1 args: - registry diff --git a/pctasks/ingest_task/pctasks/ingest_task/pgstac.py b/pctasks/ingest_task/pctasks/ingest_task/pgstac.py index 09dcfe0b..b48f7762 100644 --- a/pctasks/ingest_task/pctasks/ingest_task/pgstac.py +++ b/pctasks/ingest_task/pctasks/ingest_task/pgstac.py @@ -54,16 +54,19 @@ def ingest_items( mode: Methods = Methods.upsert, insert_group_size: Optional[int] = None, ) -> None: + all_unique_items = list( + self.unique_items(items, lambda b: orjson.loads(b)["id"]) + ) if insert_group_size: - groups = grouped(items, insert_group_size) + groups = grouped(all_unique_items, insert_group_size) else: - groups = [items] + groups = [all_unique_items] for i, group in enumerate(groups): logger.info(f" ...Loading group {i + 1}") self._with_connection_retry( lambda: self.loader.load_items( - iter(self.unique_items(group, lambda b: orjson.loads(b)["id"])), + iter(group), insert_mode=mode, ) ) diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index 05c8dad2..30449a44 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -1,7 +1,10 @@ import json import pathlib +from typing import Generator, List, Optional import orjson +import pytest +from unittest.mock import patch, MagicMock from pctasks.core.models.task import FailedTaskResult from pctasks.dev.mocks import MockTaskContext @@ -13,7 +16,9 @@ ) from pctasks.ingest_task.pgstac import PgSTAC from pctasks.ingest_task.task import ingest_task +from pctasks.core.utils import grouped from tests.conftest import ingest_test_environment +from pypgstac.load import Methods HERE = pathlib.Path(__file__).parent TEST_COLLECTION = HERE / "data-files/test_collection.json" @@ -22,6 +27,51 @@ TEST_DUPE_NDJSON = HERE / "data-files/items/items_dupe.ndjson" +@pytest.fixture +def pgstac_fixture(): + with ( + patch("pypgstac.db.PgstacDB") as MockPgstacDB, + patch("pypgstac.load.Loader") as MockLoader, + ): + mock_db = MockPgstacDB.return_value + mock_loader = MockLoader.return_value + + pgstac = PgSTAC("postgresql://dummy:dummy@localhost:5432/dummy") + yield pgstac + + +@pytest.fixture +def dupe_ndjson_lines() -> List[bytes]: + with open(TEST_DUPE_NDJSON, "r") as f: + return [line.strip().encode("utf-8") for line in f.readlines() if line.strip()] + + +@pytest.fixture +def duplicate_items() -> List[bytes]: + return [ + b'{"id": "item1", "data": "value1"}', + b'{"id": "item2", "data": "value2"}', + b'{"id": "item1", "data": "value1"}', # Duplicate of item1 + b'{"id": "item3", "data": "value3"}', + b'{"id": "item2", "data": "value2"}', # Duplicate of item2 + ] + + +@pytest.fixture +def capture_loader_calls(): + captured_calls = [] + + def mock_load_items(items_iter, insert_mode): + items_list = list(items_iter) + captured_calls.append({"items": items_list, "mode": insert_mode}) + return None + + mock_loader = MagicMock() + mock_loader.load_items.side_effect = mock_load_items + + return mock_loader, captured_calls + + def test_single_item_ingest(): """Test ingesting Items through the ingest task logic.""" @@ -154,17 +204,101 @@ def test_ingest_dupe_items_ndjson(): assert not isinstance(result, FailedTaskResult) -def test_unique_items_deduplication(): - pgstac = PgSTAC("postgresql://dummy:dummy@localhost:5432/dummy") +def test_unique_items_deduplication( + pgstac_fixture: Generator[PgSTAC, None, None], dupe_ndjson_lines: List[bytes] +) -> None: + unique_items = list( + pgstac_fixture.unique_items(dupe_ndjson_lines, lambda b: orjson.loads(b)["id"]) + ) - with open(TEST_DUPE_NDJSON, "r") as f: - lines = [line.strip().encode("utf-8") for line in f.readlines() if line.strip()] + assert len(dupe_ndjson_lines) == 5 + assert len(unique_items) == 3 - unique_items = list(pgstac.unique_items(lines, lambda b: orjson.loads(b)["id"])) + unique_ids = [orjson.loads(item)["id"] for item in unique_items] + assert len(unique_ids) == 3 + assert unique_ids == {"item1", "item2", "item3"} - assert len(lines) == 5 - assert len(unique_items) == 3 +def test_unique_items_grouped_deduplication( + pgstac_fixture: Generator[PgSTAC, None, None], dupe_ndjson_lines: List[bytes] +) -> None: + unique_items = list( + pgstac_fixture.unique_items(dupe_ndjson_lines, lambda b: orjson.loads(b)["id"]) + ) unique_ids = [orjson.loads(item)["id"] for item in unique_items] - assert len(set(unique_ids)) == 3 - assert set(unique_ids) == {"item1", "item2", "item3"} + + assert len(dupe_ndjson_lines) == 5 + assert len(unique_ids) == 3 + + groups = grouped(unique_items, size=3) + + seen_items = [] + for group in groups: + assert group + seen_items.extend(orjson.loads(item)["id"] for item in group) + + assert len(seen_items) == 3 + assert unique_ids == seen_items + + +@pytest.mark.parametrize( + "insert_group_size, mode", [(None, Methods.upsert), (2, Methods.insert)] +) +def test_ingest_items_deduplication_and_grouping( + pgstac_fixture: Generator[PgSTAC, None, None], + duplicate_items: List[bytes], + insert_group_size: Optional[int], + mode: Methods, +): + captured_groups = [] + modes_passed = [] + + # gets the groups from the actual code path to evaluate without side effects + def mock_load_items(items_iter, insert_mode): + items_list = list(items_iter) + captured_groups.append({"items": items_list}) + modes_passed.append(insert_mode) + return None + + pgstac_fixture.loader.load_items = mock_load_items + + pgstac_fixture.ingest_items( + duplicate_items, mode=Methods.upsert, insert_group_size=insert_group_size + ) + + expected_group_count = 1 if insert_group_size is None else insert_group_size + + assert len(captured_groups) == expected_group_count, ( + f"Expected {expected_group_count} groups" + ) + + all_items = [] + for group in captured_groups: + all_items.extend(group["items"]) + + assert len(all_items) == 3, "Should have 3 unique items after deduplication" + + all_ids = [orjson.loads(item)["id"] for item in all_items] + assert len(all_ids) == len(set(all_ids)), "No duplicates across groups" + assert set(all_ids) == {"item1", "item2", "item3"}, "All items correctly included" + + +@pytest.mark.parametrize("mode", [Methods.upsert, Methods.insert]) +def test_ingest_items_with_different_modes( + pgstac_fixture: Generator[PgSTAC, None, None], + duplicate_items: List[bytes], + mode: Methods, +) -> None: + modes_passed = [] + + def mock_load_items(items_iter, insert_mode): + modes_passed.append(insert_mode) + list(items_iter) + return None + + pgstac_fixture.loader.load_items = mock_load_items + + pgstac_fixture.ingest_items(duplicate_items, mode=mode) + + assert len(modes_passed) == 1, "load_items should be called once" + assert modes_passed[0] == mode, f"Mode should be {mode}" From a4abb264ba2979af9dd30438e339a5839b01c5b6 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 19 Aug 2025 14:07:40 -0400 Subject: [PATCH 08/22] format --- pctasks/ingest_task/tests/test_items.py | 77 +++++++++++++++---------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index 30449a44..6a6a1927 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -1,12 +1,15 @@ import json import pathlib -from typing import Generator, List, Optional +from pathlib import Path +from typing import Dict, Generator, List, Optional, Tuple +from unittest.mock import MagicMock, Mock, patch import orjson import pytest -from unittest.mock import patch, MagicMock +from pypgstac.load import Methods from pctasks.core.models.task import FailedTaskResult +from pctasks.core.utils import grouped from pctasks.dev.mocks import MockTaskContext from pctasks.ingest.models import ( IngestNdjsonInput, @@ -16,9 +19,7 @@ ) from pctasks.ingest_task.pgstac import PgSTAC from pctasks.ingest_task.task import ingest_task -from pctasks.core.utils import grouped from tests.conftest import ingest_test_environment -from pypgstac.load import Methods HERE = pathlib.Path(__file__).parent TEST_COLLECTION = HERE / "data-files/test_collection.json" @@ -28,7 +29,7 @@ @pytest.fixture -def pgstac_fixture(): +def pgstac_fixture() -> Generator[Tuple[PgSTAC, Mock, Mock]]: with ( patch("pypgstac.db.PgstacDB") as MockPgstacDB, patch("pypgstac.load.Loader") as MockLoader, @@ -37,7 +38,7 @@ def pgstac_fixture(): mock_loader = MockLoader.return_value pgstac = PgSTAC("postgresql://dummy:dummy@localhost:5432/dummy") - yield pgstac + yield (pgstac, mock_db, mock_loader) @pytest.fixture @@ -58,10 +59,12 @@ def duplicate_items() -> List[bytes]: @pytest.fixture -def capture_loader_calls(): +def capture_loader_calls() -> Tuple[MagicMock, List[dict]]: captured_calls = [] - def mock_load_items(items_iter, insert_mode): + def mock_load_items( + items_iter: Generator[bytes, None, None], insert_mode: Methods + ) -> None: items_list = list(items_iter) captured_calls.append({"items": items_list, "mode": insert_mode}) return None @@ -72,7 +75,7 @@ def mock_load_items(items_iter, insert_mode): return mock_loader, captured_calls -def test_single_item_ingest(): +def test_single_item_ingest() -> None: """Test ingesting Items through the ingest task logic.""" task_context = MockTaskContext.default() @@ -97,7 +100,7 @@ def test_single_item_ingest(): assert not isinstance(result, FailedTaskResult) -def test_ndjson_ingest(): +def test_ndjson_ingest() -> None: """Test ingesting Items through the ingest task logic.""" task_context = MockTaskContext.default() @@ -120,7 +123,7 @@ def test_ndjson_ingest(): assert not isinstance(result, FailedTaskResult) -def test_ingest_ndjson_add_service_principal(): +def test_ingest_ndjson_add_service_principal() -> None: result = IngestTaskConfig.from_ndjson( ndjson_data=IngestNdjsonInput(ndjson_folder=NdjsonFolder(uri="test/")), add_service_principal=True, @@ -151,13 +154,13 @@ def test_ingest_ndjson_add_service_principal(): result = IngestTaskConfig.from_ndjson( ndjson_data=IngestNdjsonInput(ndjson_folder=NdjsonFolder(uri="test/")), ) - - assert "AZURE_TENANT_ID" not in result.environment - assert "AZURE_TENANT_ID" not in result.environment + assert result.environment is not None assert "AZURE_TENANT_ID" not in result.environment + assert "AZURE_CLIENT_ID" not in result.environment + assert "AZURE_CLIENT_SECRET" not in result.environment -def test_empty_ndjson_ingest(tmp_path): +def test_empty_ndjson_ingest(tmp_path: Path) -> None: """Test ingesting an empty item collection works.""" task_context = MockTaskContext.default() @@ -183,7 +186,7 @@ def test_empty_ndjson_ingest(tmp_path): assert not isinstance(result, FailedTaskResult) -def test_ingest_dupe_items_ndjson(): +def test_ingest_dupe_items_ndjson() -> None: task_context = MockTaskContext.default() with ingest_test_environment(): @@ -205,10 +208,13 @@ def test_ingest_dupe_items_ndjson(): def test_unique_items_deduplication( - pgstac_fixture: Generator[PgSTAC, None, None], dupe_ndjson_lines: List[bytes] + pgstac_fixture: Tuple[PgSTAC, Mock, Mock], + dupe_ndjson_lines: List[bytes], ) -> None: unique_items = list( - pgstac_fixture.unique_items(dupe_ndjson_lines, lambda b: orjson.loads(b)["id"]) + pgstac_fixture[0].unique_items( + dupe_ndjson_lines, lambda b: orjson.loads(b)["id"] + ) ) assert len(dupe_ndjson_lines) == 5 @@ -220,10 +226,13 @@ def test_unique_items_deduplication( def test_unique_items_grouped_deduplication( - pgstac_fixture: Generator[PgSTAC, None, None], dupe_ndjson_lines: List[bytes] + pgstac_fixture: Tuple[PgSTAC, Mock, Mock], + dupe_ndjson_lines: List[bytes], ) -> None: unique_items = list( - pgstac_fixture.unique_items(dupe_ndjson_lines, lambda b: orjson.loads(b)["id"]) + pgstac_fixture[0].unique_items( + dupe_ndjson_lines, lambda b: orjson.loads(b)["id"] + ) ) unique_ids = [orjson.loads(item)["id"] for item in unique_items] @@ -232,7 +241,7 @@ def test_unique_items_grouped_deduplication( groups = grouped(unique_items, size=3) - seen_items = [] + seen_items: List[str] = [] for group in groups: assert group seen_items.extend(orjson.loads(item)["id"] for item in group) @@ -245,24 +254,26 @@ def test_unique_items_grouped_deduplication( "insert_group_size, mode", [(None, Methods.upsert), (2, Methods.insert)] ) def test_ingest_items_deduplication_and_grouping( - pgstac_fixture: Generator[PgSTAC, None, None], + pgstac_fixture: Tuple[PgSTAC, Mock, Mock], duplicate_items: List[bytes], insert_group_size: Optional[int], mode: Methods, -): - captured_groups = [] - modes_passed = [] +) -> None: + captured_groups: List[Dict[str, List[bytes]]] = [] + modes_passed: List[Methods] = [] # gets the groups from the actual code path to evaluate without side effects - def mock_load_items(items_iter, insert_mode): + def mock_load_items( + items_iter: Generator[bytes, None, None], insert_mode: Methods + ) -> None: items_list = list(items_iter) captured_groups.append({"items": items_list}) modes_passed.append(insert_mode) return None - pgstac_fixture.loader.load_items = mock_load_items + pgstac_fixture[1].load_items = mock_load_items - pgstac_fixture.ingest_items( + pgstac_fixture[0].ingest_items( duplicate_items, mode=Methods.upsert, insert_group_size=insert_group_size ) @@ -285,20 +296,22 @@ def mock_load_items(items_iter, insert_mode): @pytest.mark.parametrize("mode", [Methods.upsert, Methods.insert]) def test_ingest_items_with_different_modes( - pgstac_fixture: Generator[PgSTAC, None, None], + pgstac_fixture: Tuple[PgSTAC, Mock, Mock], duplicate_items: List[bytes], mode: Methods, ) -> None: modes_passed = [] - def mock_load_items(items_iter, insert_mode): + def mock_load_items( + items_iter: Generator[bytes, None, None], insert_mode: Methods + ) -> None: modes_passed.append(insert_mode) list(items_iter) return None - pgstac_fixture.loader.load_items = mock_load_items + pgstac_fixture[1].load_items = mock_load_items - pgstac_fixture.ingest_items(duplicate_items, mode=mode) + pgstac_fixture[0].ingest_items(duplicate_items, mode=mode) assert len(modes_passed) == 1, "load_items should be called once" assert modes_passed[0] == mode, f"Mode should be {mode}" From d13dfc420db5120ba476eb8576605a5e626210d2 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 19 Aug 2025 14:18:48 -0400 Subject: [PATCH 09/22] s5p;format --- datasets/sentinel-5p/Dockerfile | 10 +++++----- datasets/sentinel-5p/dataset.yaml | 2 +- pctasks/ingest_task/tests/test_items.py | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datasets/sentinel-5p/Dockerfile b/datasets/sentinel-5p/Dockerfile index 828da64b..f136c5ad 100644 --- a/datasets/sentinel-5p/Dockerfile +++ b/datasets/sentinel-5p/Dockerfile @@ -21,15 +21,15 @@ RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10 # See https://github.com/mapbox/rasterio/issues/1289 ENV CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt -# Install Python 3.8 -RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Mambaforge-$(uname)-$(uname -m).sh" \ - && bash "Mambaforge-$(uname)-$(uname -m).sh" -b -p /opt/conda \ - && rm -rf "Mambaforge-$(uname)-$(uname -m).sh" +# Install Python 3.10 +RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" \ + && bash "Miniforge3-$(uname)-$(uname -m).sh" -b -p /opt/conda \ + && rm -rf "Miniforge3-$(uname)-$(uname -m).sh" ENV PATH /opt/conda/bin:$PATH ENV LD_LIBRARY_PATH /opt/conda/lib/:$LD_LIBRARY_PATH -RUN mamba install -y -c conda-forge python=3.8 gdal=3.3.3 pip setuptools cython numpy==1.21.5 +RUN mamba install -y -c conda-forge python=3.10 gdal libgdal-netcdf pip setuptools cython numpy RUN python -m pip install --upgrade pip diff --git a/datasets/sentinel-5p/dataset.yaml b/datasets/sentinel-5p/dataset.yaml index dbdeb57e..6d602b79 100644 --- a/datasets/sentinel-5p/dataset.yaml +++ b/datasets/sentinel-5p/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel_5p -image: ${{ args.registry }}/pctasks-sentinel-5p:20230630.3 +image: ${{ args.registry }}/pctasks-sentinel-5p:20250819.1 args: - registry diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index 6a6a1927..cfebf529 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -279,9 +279,9 @@ def mock_load_items( expected_group_count = 1 if insert_group_size is None else insert_group_size - assert len(captured_groups) == expected_group_count, ( - f"Expected {expected_group_count} groups" - ) + assert ( + len(captured_groups) == expected_group_count + ), f"Expected {expected_group_count} groups" all_items = [] for group in captured_groups: From 1cbd932089f331b28c011cfadf2bf0e5aae04e1d Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 19 Aug 2025 14:57:35 -0400 Subject: [PATCH 10/22] readme update; fix test --- datasets/sentinel-3/README.md | 7 ++++--- pctasks/ingest_task/tests/test_items.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/datasets/sentinel-3/README.md b/datasets/sentinel-3/README.md index e559d41d..f8cceed7 100644 --- a/datasets/sentinel-3/README.md +++ b/datasets/sentinel-3/README.md @@ -32,12 +32,13 @@ $ PYTHONPATH=datasets/sentinel-3 python -m pytest datasets/sentinel-3/tests/ ```console $ ls datasets/sentinel-3/collection/ | xargs -I {} \ - pctasks dataset process-items '${{ args.since }}' \ + pctasks dataset process-items \ -d datasets/sentinel-3/dataset.yaml \ -c {} \ --workflow-id={}-update \ - --is-update-workflow \ - --upsert + --is-update-workflow {}-update \ + -u \ + -y ``` **Notes:** diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index cfebf529..024d7179 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -29,7 +29,7 @@ @pytest.fixture -def pgstac_fixture() -> Generator[Tuple[PgSTAC, Mock, Mock]]: +def pgstac_fixture() -> Generator[Tuple[PgSTAC, Mock, Mock], None, None]: with ( patch("pypgstac.db.PgstacDB") as MockPgstacDB, patch("pypgstac.load.Loader") as MockLoader, @@ -222,7 +222,7 @@ def test_unique_items_deduplication( unique_ids = [orjson.loads(item)["id"] for item in unique_items] assert len(unique_ids) == 3 - assert unique_ids == {"item1", "item2", "item3"} + assert unique_ids == ["item1", "item2", "item3"] def test_unique_items_grouped_deduplication( From 20a7a6725d1d29d173e92f2f0dcf61721ceb38f8 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 19 Aug 2025 15:14:21 -0400 Subject: [PATCH 11/22] use proper mock loader; readme update --- datasets/sentinel-5p/README.md | 7 ++++--- pctasks/ingest_task/tests/test_items.py | 10 +++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/datasets/sentinel-5p/README.md b/datasets/sentinel-5p/README.md index 90565025..803f3a92 100644 --- a/datasets/sentinel-5p/README.md +++ b/datasets/sentinel-5p/README.md @@ -15,12 +15,13 @@ az acr build -r {the registry} --subscription {the subscription} -t pctasks-sent This collection is updated regularly. ```console -$ pctasks dataset process-items '${{ args.since }}' \ +$ pctasks dataset process-items \ -d datasets/sentinel-5p/dataset.yaml \ -c sentinel-5p-l2-netcdf \ --workflow-id=sentinel-5p-l2-netcdf-update \ - --is-update-workflow \ - --upsert + --is-update-workflow sentinel-5p-l2-netcdf-update \ + -u \ + -y ``` **Notes:** diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index 024d7179..cba69f0d 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -271,7 +271,7 @@ def mock_load_items( modes_passed.append(insert_mode) return None - pgstac_fixture[1].load_items = mock_load_items + pgstac_fixture[2].load_items = mock_load_items pgstac_fixture[0].ingest_items( duplicate_items, mode=Methods.upsert, insert_group_size=insert_group_size @@ -279,9 +279,9 @@ def mock_load_items( expected_group_count = 1 if insert_group_size is None else insert_group_size - assert ( - len(captured_groups) == expected_group_count - ), f"Expected {expected_group_count} groups" + assert len(captured_groups) == expected_group_count, ( + f"Expected {expected_group_count} groups" + ) all_items = [] for group in captured_groups: @@ -309,7 +309,7 @@ def mock_load_items( list(items_iter) return None - pgstac_fixture[1].load_items = mock_load_items + pgstac_fixture[2].load_items = mock_load_items pgstac_fixture[0].ingest_items(duplicate_items, mode=mode) From 0f9904e291b0f446ee68a3d240c2176d6fccdb8b Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 19 Aug 2025 15:27:07 -0400 Subject: [PATCH 12/22] format --- pctasks/ingest_task/tests/test_items.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index cba69f0d..d06b815c 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -279,9 +279,9 @@ def mock_load_items( expected_group_count = 1 if insert_group_size is None else insert_group_size - assert len(captured_groups) == expected_group_count, ( - f"Expected {expected_group_count} groups" - ) + assert ( + len(captured_groups) == expected_group_count + ), f"Expected {expected_group_count} groups" all_items = [] for group in captured_groups: From bd907642fbd0d54c17d8fe8a5d107c06ecd6eb1e Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 9 Sep 2025 12:43:01 -0400 Subject: [PATCH 13/22] prevent create_item failures from failing the entire ingestion; log and continue --- pctasks/dataset/pctasks/dataset/items/task.py | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/pctasks/dataset/pctasks/dataset/items/task.py b/pctasks/dataset/pctasks/dataset/items/task.py index e719cc9e..72ffa588 100644 --- a/pctasks/dataset/pctasks/dataset/items/task.py +++ b/pctasks/dataset/pctasks/dataset/items/task.py @@ -208,24 +208,25 @@ def create_items( asset_uri, args.collection_id, i=i, asset_count=asset_count ): result = self._create_item(asset_uri, storage_factory) + if isinstance(result, WaitTaskResult): + return result + else: + if not result: + logger.warning(f"No items created from {asset_uri}") + else: + results.extend( + validate_create_items_result( + result, + collection_id=args.collection_id, + skip_validation=args.options.skip_validation, + ) + ) except Exception as e: tb_str = traceback.format_exc() logger.error( f"Failed to create item from {asset_uri}: {type(e).__name__}: {str(e)}\n{tb_str}" # noqa: E501 ) - if isinstance(result, WaitTaskResult): - return result - else: - if not result: - logger.warning(f"No items created from {asset_uri}") - else: - results.extend( - validate_create_items_result( - result, - collection_id=args.collection_id, - skip_validation=args.options.skip_validation, - ) - ) + continue else: # Should be prevented by validator From 519ac85306225a5a0e81e7b20f066c9cd689adb8 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 9 Sep 2025 15:40:21 -0400 Subject: [PATCH 14/22] fix postgres timeouts --- pctasks/ingest_task/tests/test_items.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index d06b815c..e5ed40a3 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -37,7 +37,7 @@ def pgstac_fixture() -> Generator[Tuple[PgSTAC, Mock, Mock], None, None]: mock_db = MockPgstacDB.return_value mock_loader = MockLoader.return_value - pgstac = PgSTAC("postgresql://dummy:dummy@localhost:5432/dummy") + pgstac = PgSTAC("postgresql://username:password@database:5432/postgis") yield (pgstac, mock_db, mock_loader) @@ -279,9 +279,9 @@ def mock_load_items( expected_group_count = 1 if insert_group_size is None else insert_group_size - assert ( - len(captured_groups) == expected_group_count - ), f"Expected {expected_group_count} groups" + assert len(captured_groups) == expected_group_count, ( + f"Expected {expected_group_count} groups" + ) all_items = [] for group in captured_groups: From 8e26fc7f93ed89b6327b6f0272a0e7d85eb99968 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 9 Sep 2025 15:51:27 -0400 Subject: [PATCH 15/22] format --- pctasks/ingest_task/tests/test_items.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index e5ed40a3..e37ee84c 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -279,9 +279,9 @@ def mock_load_items( expected_group_count = 1 if insert_group_size is None else insert_group_size - assert len(captured_groups) == expected_group_count, ( - f"Expected {expected_group_count} groups" - ) + assert ( + len(captured_groups) == expected_group_count + ), f"Expected {expected_group_count} groups" all_items = [] for group in captured_groups: From b0c7301e6e6fc08b10250419bd9a3ab951cb5e63 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 9 Sep 2025 16:08:22 -0400 Subject: [PATCH 16/22] update database version for cicd --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 864c4cc7..782c4f8d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: database: container_name: pctasks-database - image: ghcr.io/stac-utils/pgstac:v0.7.3 + image: ghcr.io/stac-utils/pgstac:v0.8.6 environment: - POSTGRES_USER=username - POSTGRES_PASSWORD=password From 3c037091a628ec7b8a9bc90f7fb84a352d7503de Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 9 Sep 2025 16:25:36 -0400 Subject: [PATCH 17/22] use proper test items --- pctasks/dataset/pctasks/dataset/items/task.py | 29 ++++++++++--------- pctasks/ingest_task/tests/test_items.py | 19 +++--------- 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/pctasks/dataset/pctasks/dataset/items/task.py b/pctasks/dataset/pctasks/dataset/items/task.py index 72ffa588..9b4bd14a 100644 --- a/pctasks/dataset/pctasks/dataset/items/task.py +++ b/pctasks/dataset/pctasks/dataset/items/task.py @@ -178,21 +178,22 @@ def create_items( try: with traced_create_item(args.asset_uri, args.collection_id): result = self._create_item(args.asset_uri, storage_factory) + if isinstance(result, WaitTaskResult): + return result + elif result is None: + logger.warning(f"No items created from {args.asset_uri}") + else: + results.extend( + validate_create_items_result( + result, + collection_id=args.collection_id, + skip_validation=args.options.skip_validation, + ) + ) except Exception as e: - raise CreateItemsError( - f"Failed to create item from {args.asset_uri}" - ) from e - if isinstance(result, WaitTaskResult): - return result - elif result is None: - logger.warning(f"No items created from {args.asset_uri}") - else: - results.extend( - validate_create_items_result( - result, - collection_id=args.collection_id, - skip_validation=args.options.skip_validation, - ) + tb_str = traceback.format_exc() + logger.error( + f"Failed to create item from {args.asset_uri}: {type(e).__name__}: {str(e)}\n{tb_str}" # noqa: E501 ) elif args.asset_chunk_info: chunk_storage, chunk_path = storage_factory.get_storage_for_file( diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index e37ee84c..df122371 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -47,17 +47,6 @@ def dupe_ndjson_lines() -> List[bytes]: return [line.strip().encode("utf-8") for line in f.readlines() if line.strip()] -@pytest.fixture -def duplicate_items() -> List[bytes]: - return [ - b'{"id": "item1", "data": "value1"}', - b'{"id": "item2", "data": "value2"}', - b'{"id": "item1", "data": "value1"}', # Duplicate of item1 - b'{"id": "item3", "data": "value3"}', - b'{"id": "item2", "data": "value2"}', # Duplicate of item2 - ] - - @pytest.fixture def capture_loader_calls() -> Tuple[MagicMock, List[dict]]: captured_calls = [] @@ -255,7 +244,7 @@ def test_unique_items_grouped_deduplication( ) def test_ingest_items_deduplication_and_grouping( pgstac_fixture: Tuple[PgSTAC, Mock, Mock], - duplicate_items: List[bytes], + dupe_ndjson_lines: List[bytes], insert_group_size: Optional[int], mode: Methods, ) -> None: @@ -274,7 +263,7 @@ def mock_load_items( pgstac_fixture[2].load_items = mock_load_items pgstac_fixture[0].ingest_items( - duplicate_items, mode=Methods.upsert, insert_group_size=insert_group_size + dupe_ndjson_lines, mode=Methods.upsert, insert_group_size=insert_group_size ) expected_group_count = 1 if insert_group_size is None else insert_group_size @@ -297,7 +286,7 @@ def mock_load_items( @pytest.mark.parametrize("mode", [Methods.upsert, Methods.insert]) def test_ingest_items_with_different_modes( pgstac_fixture: Tuple[PgSTAC, Mock, Mock], - duplicate_items: List[bytes], + dupe_ndjson_lines: List[bytes], mode: Methods, ) -> None: modes_passed = [] @@ -311,7 +300,7 @@ def mock_load_items( pgstac_fixture[2].load_items = mock_load_items - pgstac_fixture[0].ingest_items(duplicate_items, mode=mode) + pgstac_fixture[0].ingest_items(dupe_ndjson_lines, mode=mode) assert len(modes_passed) == 1, "load_items should be called once" assert modes_passed[0] == mode, f"Mode should be {mode}" From 30dc84226f8d6095e5edd9b22ffeacd3a820a890 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Tue, 9 Sep 2025 16:49:05 -0400 Subject: [PATCH 18/22] use proper test items --- pctasks/ingest_task/tests/test_items.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index df122371..aebd61cb 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -261,16 +261,18 @@ def mock_load_items( return None pgstac_fixture[2].load_items = mock_load_items + pgstac_fixture[2].format_item = lambda item: item - pgstac_fixture[0].ingest_items( - dupe_ndjson_lines, mode=Methods.upsert, insert_group_size=insert_group_size - ) + with patch.object(pgstac_fixture[0], "loader", pgstac_fixture[2]): + pgstac_fixture[0].ingest_items( + dupe_ndjson_lines, mode=Methods.upsert, insert_group_size=insert_group_size + ) expected_group_count = 1 if insert_group_size is None else insert_group_size - assert ( - len(captured_groups) == expected_group_count - ), f"Expected {expected_group_count} groups" + assert len(captured_groups) == expected_group_count, ( + f"Expected {expected_group_count} groups" + ) all_items = [] for group in captured_groups: @@ -299,8 +301,10 @@ def mock_load_items( return None pgstac_fixture[2].load_items = mock_load_items + pgstac_fixture[2].format_item = lambda item: item - pgstac_fixture[0].ingest_items(dupe_ndjson_lines, mode=mode) + with patch.object(pgstac_fixture[0], "loader", pgstac_fixture[2]): + pgstac_fixture[0].ingest_items(dupe_ndjson_lines, mode=mode) assert len(modes_passed) == 1, "load_items should be called once" assert modes_passed[0] == mode, f"Mode should be {mode}" From 60d75b7ce6097317e84fb21233d05e742a957305 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Wed, 10 Sep 2025 08:46:09 -0400 Subject: [PATCH 19/22] format --- pctasks/ingest_task/tests/test_items.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index aebd61cb..fbdc7f99 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -270,9 +270,9 @@ def mock_load_items( expected_group_count = 1 if insert_group_size is None else insert_group_size - assert len(captured_groups) == expected_group_count, ( - f"Expected {expected_group_count} groups" - ) + assert ( + len(captured_groups) == expected_group_count + ), f"Expected {expected_group_count} groups" all_items = [] for group in captured_groups: From 1321ab5d3cbfb2b9c091eb9aa892ece8f8f3db27 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Wed, 10 Sep 2025 09:41:35 -0400 Subject: [PATCH 20/22] update image s3/s5p --- datasets/sentinel-3/dataset.yaml | 2 +- datasets/sentinel-5p/dataset.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datasets/sentinel-3/dataset.yaml b/datasets/sentinel-3/dataset.yaml index 5dfeaeed..715b53c1 100644 --- a/datasets/sentinel-3/dataset.yaml +++ b/datasets/sentinel-3/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel-3 -image: ${{ args.registry }}/pctasks-sentinel-3:20250819.1 +image: ${{ args.registry }}/pctasks-sentinel-3:20250910.1 args: - registry diff --git a/datasets/sentinel-5p/dataset.yaml b/datasets/sentinel-5p/dataset.yaml index 6d602b79..da6cd42d 100644 --- a/datasets/sentinel-5p/dataset.yaml +++ b/datasets/sentinel-5p/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel_5p -image: ${{ args.registry }}/pctasks-sentinel-5p:20250819.1 +image: ${{ args.registry }}/pctasks-sentinel-5p:20250910.1 args: - registry From 7aa1a5c1ff1df7ee10a0db2335d292535f81ddc0 Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Wed, 24 Sep 2025 09:47:40 -0400 Subject: [PATCH 21/22] updates for s3 and s5p --- datasets/sentinel-3/Dockerfile | 2 +- datasets/sentinel-3/dataset.yaml | 2 +- datasets/sentinel-3/requirements.txt | 2 +- datasets/sentinel-3/sentinel_3.py | 3 +-- datasets/sentinel-5p/sentinel_5p.py | 1 - 5 files changed, 4 insertions(+), 6 deletions(-) diff --git a/datasets/sentinel-3/Dockerfile b/datasets/sentinel-3/Dockerfile index eb5fad36..2652b9d4 100644 --- a/datasets/sentinel-3/Dockerfile +++ b/datasets/sentinel-3/Dockerfile @@ -66,7 +66,7 @@ RUN cd /opt/src/pctasks/dataset && \ pip install . COPY ./datasets/sentinel-3/requirements.txt /opt/src/datasets/sentinel-3/requirements.txt -RUN python3 -m pip install -r /opt/src/datasets/sentinel-3/requirements.txt +RUN python3 -m pip install -r /opt/src/datasets/sentinel-3/requirements.txt # Setup Python Path to allow import of test modules ENV PYTHONPATH=/opt/src:$PYTHONPATH diff --git a/datasets/sentinel-3/dataset.yaml b/datasets/sentinel-3/dataset.yaml index 715b53c1..632f235d 100644 --- a/datasets/sentinel-3/dataset.yaml +++ b/datasets/sentinel-3/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel-3 -image: ${{ args.registry }}/pctasks-sentinel-3:20250910.1 +image: ${{ args.registry }}/pctasks-sentinel-3:20250922.1 args: - registry diff --git a/datasets/sentinel-3/requirements.txt b/datasets/sentinel-3/requirements.txt index 3aa350eb..2387c907 100644 --- a/datasets/sentinel-3/requirements.txt +++ b/datasets/sentinel-3/requirements.txt @@ -1 +1 @@ -git+https://github.com/stactools-packages/sentinel3.git@36375cc63c053087380664ff931ceed5ad3b5f83 +git+https://github.com/stactools-packages/sentinel3.git@93518a430556f290d5e55d3ae0fa1d76cec26197 \ No newline at end of file diff --git a/datasets/sentinel-3/sentinel_3.py b/datasets/sentinel-3/sentinel_3.py index 6d648552..b3b90a00 100644 --- a/datasets/sentinel-3/sentinel_3.py +++ b/datasets/sentinel-3/sentinel_3.py @@ -12,7 +12,7 @@ import pctasks.dataset.collection from pctasks.core.models.task import WaitTaskResult -from pctasks.core.storage import Storage, StorageFactory +from pctasks.core.storage import StorageFactory from pctasks.core.utils.backoff import is_common_throttle_exception, with_backoff handler = logging.StreamHandler() @@ -240,7 +240,6 @@ class Sentinel3Collections(pctasks.dataset.collection.Collection): def create_item( cls, asset_uri: str, storage_factory: StorageFactory ) -> Union[List[pystac.Item], WaitTaskResult]: - # Only create Items for NT (Not Time critical) products sen3_archive = os.path.dirname(asset_uri) assert sen3_archive.endswith(".SEN3") diff --git a/datasets/sentinel-5p/sentinel_5p.py b/datasets/sentinel-5p/sentinel_5p.py index a155f76b..94d9c1f2 100644 --- a/datasets/sentinel-5p/sentinel_5p.py +++ b/datasets/sentinel-5p/sentinel_5p.py @@ -23,7 +23,6 @@ class Sentinel5pNetCDFCollection(Collection): def create_item( cls, asset_uri: str, storage_factory: StorageFactory ) -> Union[List[pystac.Item], WaitTaskResult]: - storage, nc_path = storage_factory.get_storage_for_file(asset_uri) with TemporaryDirectory() as tmp_dir: From 53b1b3f7f12bb9818d95c417bfd534035991903a Mon Sep 17 00:00:00 2001 From: Chad Peters Date: Wed, 24 Sep 2025 13:04:06 -0400 Subject: [PATCH 22/22] revert pgstac upgrade that check if tests still pass --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 782c4f8d..864c4cc7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,7 +14,7 @@ services: database: container_name: pctasks-database - image: ghcr.io/stac-utils/pgstac:v0.8.6 + image: ghcr.io/stac-utils/pgstac:v0.7.3 environment: - POSTGRES_USER=username - POSTGRES_PASSWORD=password