diff --git a/docs/guides/extensions/curator/metadata_curation.md b/docs/guides/extensions/curator/metadata_curation.md index 67f92c46b..4f7d336c8 100644 --- a/docs/guides/extensions/curator/metadata_curation.md +++ b/docs/guides/extensions/curator/metadata_curation.md @@ -10,6 +10,7 @@ By following this guide, you will: - Create a metadata curation workflow with automatic validation - Set up either file-based or record-based metadata collection - Configure curation tasks that guide collaborators through metadata entry +- Retrieve and analyze detailed validation results to identify data quality issues ## Prerequisites @@ -178,6 +179,256 @@ print(f" EntityView: {entity_view_id}") print(f" CurationTask: {task_id}") ``` +## Step 4: Work with metadata and validate (Record-based workflow) + +After creating a record-based metadata task, collaborators can enter metadata through the Grid interface. Once metadata entry is complete, you'll want to validate the data against your schema and identify any issues. + +### The metadata curation workflow + +1. **Data Entry**: Collaborators use the Grid interface (via the curation task link in the Synapse web UI) to enter metadata +2. **Grid Export**: Export the Grid session back to the RecordSet to save changes (this can be done via the web UI or programmatically) +3. **Validation**: Retrieve detailed validation results to identify schema violations +4. **Correction**: Fix any validation errors and repeat as needed + +### Creating and exporting a Grid session + +Validation results are only generated when a Grid session is exported back to the RecordSet. This triggers Synapse to validate each row against the bound schema. You have two options: + +**Option A: Via the Synapse web UI (most common)** + +Users can access the curation task through the Synapse web interface, enter/edit data in the Grid, and click the export button. This automatically generates validation results. + +**Option B: Programmatically create and export a Grid session** + +```python +from synapseclient import Synapse +from synapseclient.models import RecordSet +from synapseclient.models.curation import Grid + +syn = Synapse() +syn.login() + +# Get your RecordSet (must have a schema bound) +record_set = RecordSet(id="syn987654321").get() + +# Create a Grid session from the RecordSet +grid = Grid(record_set_id=record_set.id).create() + +# At this point, users can interact with the Grid (either programmatically or via web UI) +# When ready to save changes and generate validation results, export back to RecordSet +grid.export_to_record_set() + +# Clean up the Grid session +grid.delete() + +# Re-fetch the RecordSet to get the updated validation_file_handle_id +record_set = RecordSet(id=record_set.id).get() +``` + +**Important**: The `validation_file_handle_id` attribute is only populated after a Grid export operation. Until then, `get_detailed_validation_results()` will return `None`. + +### Getting detailed validation results + +After exporting from a Grid session with a bound schema, Synapse automatically validates each row against the schema and generates a detailed validation report. Here's how to retrieve and analyze those results: + +```python +from synapseclient import Synapse +from synapseclient.models import RecordSet + +syn = Synapse() +syn.login() + +# After Grid export (either via web UI or programmatically) +# retrieve the updated RecordSet +record_set = RecordSet(id="syn987654321").get() + +# Get detailed validation results as a pandas DataFrame +validation_results = record_set.get_detailed_validation_results() + +if validation_results is not None: + print(f"Total rows validated: {len(validation_results)}") + + # Filter for valid and invalid rows + valid_rows = validation_results[validation_results['is_valid'] == True] + invalid_rows = validation_results[validation_results['is_valid'] == False] + + print(f"Valid rows: {len(valid_rows)}") + print(f"Invalid rows: {len(invalid_rows)}") + + # Display details of any validation errors + if len(invalid_rows) > 0: + print("\nRows with validation errors:") + for idx, row in invalid_rows.iterrows(): + print(f"\nRow {row['row_index']}:") + print(f" Error: {row['validation_error_message']}") + print(f" ValidationError: {row['all_validation_messages']}") +else: + print("No validation results available. The Grid session must be exported to generate validation results.") +``` + +### Example: Complete validation workflow for animal study metadata + +This example demonstrates the full workflow from creating a curation task through validating the submitted metadata: + +```python +from synapseclient import Synapse +from synapseclient.extensions.curator import create_record_based_metadata_task, query_schema_registry +from synapseclient.models import RecordSet +from synapseclient.models.curation import Grid +import pandas as pd +import tempfile +import os +import time + +syn = Synapse() +syn.login() + +# Step 1: Find the schema +schema_uri = query_schema_registry( + synapse_client=syn, + dcc="ad", + datatype="IndividualAnimalMetadataTemplate" +) + +# Step 1.5: Create initial test data with validation examples +# Row 1: VALID - all required fields present and valid +# Row 2: INVALID - missing required field 'genotype' +# Row 3: INVALID - invalid enum value for 'sex' ("other" not in enum) +test_data = pd.DataFrame({ + "individualID": ["ANIMAL001", "ANIMAL002", "ANIMAL003"], + "species": ["Mouse", "Mouse", "Mouse"], + "sex": ["female", "male", "other"], # Row 3: invalid enum + "genotype": ["5XFAD", None, "APOE4KI"], # Row 2: missing required field + "genotypeBackground": ["C57BL/6J", "C57BL/6J", "C57BL/6J"], + "modelSystemName": ["5XFAD", "5XFAD", "APOE4KI"], + "dateBirth": ["2024-01-15", "2024-02-20", "2024-03-10"], + "individualIdSource": ["JAX", "JAX", "JAX"], +}) + +# Create a temporary CSV file with the test data +temp_fd, temp_csv = tempfile.mkstemp(suffix=".csv") +os.close(temp_fd) +test_data.to_csv(temp_csv, index=False) + +# Step 2: Create the curation task (this creates an empty template RecordSet) +record_set, curation_task, data_grid = create_record_based_metadata_task( + synapse_client=syn, + project_id="syn123456789", + folder_id="syn987654321", + record_set_name="AnimalMetadata_Records", + record_set_description="Animal study metadata with validation", + curation_task_name="AnimalMetadata_Validation_Example", + upsert_keys=["individualID"], + instructions="Enter metadata for each animal. All required fields must be completed.", + schema_uri=schema_uri, + bind_schema_to_record_set=True, +) + +time.sleep(10) + +print(f"Curation task created with ID: {curation_task.task_id}") +print(f"RecordSet created with ID: {record_set.id}") + +# Step 2.5: Upload the test data to the RecordSet +record_set = RecordSet(id=record_set.id).get(synapse_client=syn) +print("\nUploading test data to RecordSet...") +record_set.path = temp_csv +record_set = record_set.store(synapse_client=syn) +print(f"Test data uploaded to RecordSet {record_set.id}") + +# Step 3: Collaborators enter data via the web UI, OR you can create/export a Grid programmatically +# For demonstration, here's the programmatic approach: +print("\nCreating Grid session for data entry...") +grid = Grid(record_set_id=record_set.id).create() +print("Grid session created. Users can now enter data.") + +# After data entry is complete (either via web UI or programmatically), +# export the Grid to generate validation results +print("\nExporting Grid to RecordSet to generate validation results...") +grid.export_to_record_set() + +# Clean up the Grid session +grid.delete() +print("Grid session exported and deleted.") + +# Step 4: Refresh the RecordSet to get the latest validation results +print("\nRefreshing RecordSet to retrieve validation results...") +record_set = RecordSet(id=record_set.id).get() + +# Step 5: Analyze validation results +validation_df = record_set.get_detailed_validation_results() + +if validation_df is not None: + # Summary statistics + total_rows = len(validation_df) + valid_count = (validation_df['is_valid'] == True).sum() # noqa: E712 + invalid_count = (validation_df['is_valid'] == False).sum() # noqa: E712 + + print("\n=== Validation Summary ===") + print(f"Total records: {total_rows}") + print(f"Valid records: {valid_count} ({valid_count}/{total_rows})") + print(f"Invalid records: {invalid_count} ({invalid_count}/{total_rows})") + + # Group errors by type for better understanding + if invalid_count > 0: + invalid_rows = validation_df[validation_df['is_valid'] == False] # noqa: E712 + + # Export detailed error report for review + error_report = invalid_rows[['row_index', 'validation_error_message', 'all_validation_messages']] + error_report_path = "validation_errors_report.csv" + error_report.to_csv(error_report_path, index=False) + print(f"\nDetailed error report saved to: {error_report_path}") + + # Show first few errors as examples + print("\n=== Sample Validation Errors ===") + for idx, row in error_report.head(3).iterrows(): + print(f"\nRow {row['row_index']}:") + print(f" Error: {row['validation_error_message']}") + print(f" ValidationError: {row['all_validation_messages']}") + +# Clean up temporary file +if os.path.exists(temp_csv): + os.unlink(temp_csv) +``` + +In this example you would expect to get results like: + +``` +=== Sample Validation Errors === + +Row 0: + Error: expected type: String, found: Long + ValidationError: ["#/dateBirth: expected type: String, found: Long"] + +Row 1: + Error: 2 schema violations found + ValidationError: ["#/genotype: expected type: String, found: Null","#/dateBirth: expected type: String, found: Long"] + +Row 2: + Error: 2 schema violations found + ValidationError: ["#/dateBirth: expected type: String, found: Long","#/sex: other is not a valid enum value"] +``` + +**Key points about validation results:** + +- **Automatic generation**: Validation results are created automatically when you export data from a Grid session with a bound schema +- **Row-level detail**: Each row in your RecordSet gets its own validation status and error messages +- **Multiple violations**: The `all_validation_messages` column contains all schema violations for a row, not just the first one +- **Iterative correction**: Use the validation results to identify issues, make corrections in the Grid, export again, and re-validate + +### When validation results are available + +Validation results are only available after: +1. A JSON schema has been bound to the RecordSet (set `bind_schema_to_record_set=True` when creating the task) +2. Data has been entered through a Grid session +3. **The Grid session has been exported back to the RecordSet** - This is the critical step that triggers validation and populates the `validation_file_handle_id` attribute + +The export can happen in two ways: +- **Via the Synapse web UI**: Users click the export/save button in the Grid interface +- **Programmatically**: Call `grid.export_to_record_set()` after creating a Grid session + +If `get_detailed_validation_results()` returns `None`, the most common reason is that the Grid session hasn't been exported yet. Check that `record_set.validation_file_handle_id` is not `None` after exporting. + ## Additional utilities ### Validate schema binding on folders @@ -227,6 +478,9 @@ for curation_task in CurationTask.list( - [query_schema_registry][synapseclient.extensions.curator.query_schema_registry] - Search for schemas in the registry - [create_record_based_metadata_task][synapseclient.extensions.curator.create_record_based_metadata_task] - Create RecordSet-based curation workflows - [create_file_based_metadata_task][synapseclient.extensions.curator.create_file_based_metadata_task] - Create EntityView-based curation workflows +- [RecordSet.get_detailed_validation_results][synapseclient.models.RecordSet.get_detailed_validation_results] - Get detailed validation results for RecordSet data +- [Grid.create][synapseclient.models.curation.Grid.create] - Create a Grid session from a RecordSet +- [Grid.export_to_record_set][synapseclient.models.curation.Grid.export_to_record_set] - Export Grid data back to RecordSet and generate validation results - [Folder.bind_schema][synapseclient.models.Folder.bind_schema] - Bind schemas to folders - [Folder.validate_schema][synapseclient.models.Folder.validate_schema] - Validate folder schema compliance - [CurationTask.list][synapseclient.models.CurationTask.list] - List curation tasks in a project diff --git a/docs/reference/experimental/async/curator.md b/docs/reference/experimental/async/curator.md index e1d1e101f..bf292948b 100644 --- a/docs/reference/experimental/async/curator.md +++ b/docs/reference/experimental/async/curator.md @@ -25,6 +25,7 @@ at your own risk. - get_async - store_async - delete_async + - get_detailed_validation_results_async - get_acl_async - get_permissions_async - set_permissions_async diff --git a/docs/reference/experimental/sync/curator.md b/docs/reference/experimental/sync/curator.md index 58b62f675..b02244aab 100644 --- a/docs/reference/experimental/sync/curator.md +++ b/docs/reference/experimental/sync/curator.md @@ -25,6 +25,7 @@ at your own risk. - get - store - delete + - get_detailed_validation_results - get_acl - get_permissions - set_permissions diff --git a/synapseclient/core/typing_utils.py b/synapseclient/core/typing_utils.py new file mode 100644 index 000000000..608112bee --- /dev/null +++ b/synapseclient/core/typing_utils.py @@ -0,0 +1,32 @@ +"""Typing utilities for optional dependencies. + +This module provides type aliases for optional dependencies like pandas and numpy, +allowing proper type checking without requiring these packages to be installed. +""" + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + try: + from pandas import DataFrame, Series + except ImportError: + DataFrame = Any # type: ignore[misc, assignment] + Series = Any # type: ignore[misc, assignment] + + try: + import numpy as np + except ImportError: + np = Any # type: ignore[misc, assignment] + + try: + import networkx as nx + except ImportError: + nx = Any # type: ignore[misc, assignment] +else: + # At runtime, use object as a placeholder + DataFrame = object + Series = object + np = object # type: ignore[misc, assignment] + nx = object # type: ignore[misc, assignment] + +__all__ = ["DataFrame", "Series", "np", "nx"] diff --git a/synapseclient/core/upload/multipart_upload_async.py b/synapseclient/core/upload/multipart_upload_async.py index 55085d8ee..7e6ac3d6e 100644 --- a/synapseclient/core/upload/multipart_upload_async.py +++ b/synapseclient/core/upload/multipart_upload_async.py @@ -79,7 +79,6 @@ Mapping, Optional, Tuple, - TypeVar, Union, ) @@ -107,6 +106,7 @@ ) from synapseclient.core.otel_config import get_tracer from synapseclient.core.retry import with_retry_time_based +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE from synapseclient.core.upload.upload_utils import ( copy_md5_fn, copy_part_request_body_provider_fn, @@ -123,8 +123,6 @@ if TYPE_CHECKING: from synapseclient import Synapse -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") - # AWS limits MAX_NUMBER_OF_PARTS = 10000 MIN_PART_SIZE = 5 * MB diff --git a/synapseclient/core/upload/upload_utils.py b/synapseclient/core/upload/upload_utils.py index 9ecefefc4..9175dfd0e 100644 --- a/synapseclient/core/upload/upload_utils.py +++ b/synapseclient/core/upload/upload_utils.py @@ -3,9 +3,9 @@ import math import re from io import BytesIO, StringIO -from typing import Any, Dict, Optional, TypeVar, Union +from typing import Any, Dict, Optional, Union -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE def get_partial_dataframe_chunk( diff --git a/synapseclient/core/utils.py b/synapseclient/core/utils.py index b221386f5..e02c0487b 100644 --- a/synapseclient/core/utils.py +++ b/synapseclient/core/utils.py @@ -1548,3 +1548,22 @@ def log_dataclass_diff( value2 = getattr(obj2, field.name) if value1 != value2: logger.info(f"{prefix}{field.name}: {value1} -> {value2}") + + +def test_import_pandas() -> None: + """This function is called within other functions and methods to ensure that pandas is installed.""" + try: + import pandas as pd # noqa F401 + # used to catch when pandas isn't installed + except ModuleNotFoundError: + raise ModuleNotFoundError( + """\n\nThe pandas package is required for this function!\n + Most functions in the synapseclient package don't require the + installation of pandas, but some do. Please refer to the installation + instructions at: http://pandas.pydata.org/ or + https://python-docs.synapse.org/tutorials/installation/#installation-guide-for-pypi-users. + \n\n\n""" + ) + # catch other errors (see SYNPY-177) + except: # noqa + raise diff --git a/synapseclient/extensions/curator/record_based_metadata_task.py b/synapseclient/extensions/curator/record_based_metadata_task.py index 33f6fe672..d35b084ef 100644 --- a/synapseclient/extensions/curator/record_based_metadata_task.py +++ b/synapseclient/extensions/curator/record_based_metadata_task.py @@ -6,9 +6,11 @@ in Synapse, including RecordSet creation, CurationTask setup, and Grid view initialization. """ import tempfile -from typing import Any, Dict, List, Optional, Tuple, TypeVar +from typing import Any, Dict, List, Optional, Tuple from synapseclient import Synapse +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE +from synapseclient.core.utils import test_import_pandas from synapseclient.models import ( CurationTask, Grid, @@ -17,8 +19,6 @@ RecordSet, ) -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") - def extract_property_titles(schema_data: Dict[str, Any]) -> List[str]: """ @@ -50,6 +50,7 @@ def create_dataframe_from_titles(titles: List[str]) -> DATA_FRAME_TYPE: Returns: Empty DataFrame with titles as columns """ + test_import_pandas() import pandas as pd if not titles: diff --git a/synapseclient/extensions/curator/schema_generation.py b/synapseclient/extensions/curator/schema_generation.py index 1580abeb4..f9854c49d 100644 --- a/synapseclient/extensions/curator/schema_generation.py +++ b/synapseclient/extensions/curator/schema_generation.py @@ -14,6 +14,7 @@ from pathlib import Path from string import whitespace from typing import ( + TYPE_CHECKING, AbstractSet, Any, Callable, @@ -29,6 +30,8 @@ from deprecated import deprecated +from synapseclient.core.utils import test_import_pandas + try: from dataclasses_json import config, dataclass_json except ImportError: @@ -59,12 +62,19 @@ def camelize(string, uppercase_first_letter=True): Namespace = None # type: ignore from synapseclient import Synapse - -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") -NUMPY_INT_64 = TypeVar("np.int64") -MULTI_GRAPH_TYPE = TypeVar("nx.MultiDiGraph") -GRAPH_TYPE = TypeVar("nx.Graph") -DI_GRAPH_TYPE = TypeVar("nx.DiGraph") +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE +from synapseclient.core.typing_utils import np, nx + +if TYPE_CHECKING: + NUMPY_INT_64 = np.int64 + MULTI_GRAPH_TYPE = nx.MultiDiGraph + GRAPH_TYPE = nx.Graph + DI_GRAPH_TYPE = nx.DiGraph +else: + NUMPY_INT_64 = object + MULTI_GRAPH_TYPE = object + GRAPH_TYPE = object + DI_GRAPH_TYPE = object X = TypeVar("X") @@ -328,6 +338,7 @@ def find_and_convert_ints( is_int: dataframe with boolean values indicating which cells were converted to type int """ + test_import_pandas() from pandarallel import pandarallel from pandas import DataFrame from pandas.api.types import is_integer @@ -379,6 +390,7 @@ def convert_floats(dataframe: DATA_FRAME_TYPE) -> DATA_FRAME_TYPE: Returns: float_df: dataframe with values that were converted to type float. Columns are type object """ + test_import_pandas() from pandas import to_numeric # create a separate copy of the manifest @@ -396,6 +408,7 @@ def convert_floats(dataframe: DATA_FRAME_TYPE) -> DATA_FRAME_TYPE: def get_str_pandas_na_values() -> List[str]: + test_import_pandas() from pandas._libs.parsers import STR_NA_VALUES # type: ignore STR_NA_VALUES_FILTERED = deepcopy(STR_NA_VALUES) @@ -426,6 +439,7 @@ def read_csv( Returns: pd.DataFrame: The dataframe created from the CSV file or buffer. """ + test_import_pandas() from pandas import read_csv as pandas_read_csv STR_NA_VALUES_FILTERED = get_str_pandas_na_values() @@ -469,6 +483,7 @@ def load_df( pd.DataFrame: a processed dataframe for manifests or unprocessed df for data models and where indicated """ + test_import_pandas() from pandas import DataFrame # Read CSV to df as type specified in kwargs @@ -648,6 +663,7 @@ def gather_csv_attributes_relationships( Relationships: { CSV Header: Value}}} """ + test_import_pandas() from pandas import isnull # Check csv schema follows expectations. @@ -697,6 +713,7 @@ def parse_column_type(self, attr: dict) -> dict: dict: A dictionary containing the parsed column type information if present else an empty dict """ + test_import_pandas() from pandas import isna column_type = attr.get("columnType") @@ -727,6 +744,7 @@ def parse_format(self, attribute_dict: dict) -> dict[str, str]: A dictionary containing the format value if it exists else an empty dict """ + test_import_pandas() from pandas import isna format_value = attribute_dict.get("Format") @@ -3901,6 +3919,7 @@ def parsed_model_as_dataframe( Returns: pd.DataFrame, DataFrame representation of the parsed model. """ + test_import_pandas() from pandas import DataFrame # Convert the parsed model dictionary to a DataFrame diff --git a/synapseclient/models/curation.py b/synapseclient/models/curation.py index 73ef02e22..b1ed54b0b 100644 --- a/synapseclient/models/curation.py +++ b/synapseclient/models/curation.py @@ -1148,7 +1148,7 @@ class GridSynchronousProtocol(Protocol): def create( self, - attach_to_previous_session=True, + attach_to_previous_session=False, *, timeout: int = 120, synapse_client: Optional[Synapse] = None, @@ -1158,7 +1158,7 @@ def create( Arguments: attach_to_previous_session: If True and using `record_set_id`, will attach - to an existing active session if one exists. Defaults to True. + to an existing active session if one exists. Defaults to False. timeout: The number of seconds to wait for the job to complete or progress before raising a SynapseTimeoutError. Defaults to 120. synapse_client: If not passed in and caching was not disabled by @@ -1448,7 +1448,7 @@ class Grid(GridSynchronousProtocol): async def create_async( self, - attach_to_previous_session=True, + attach_to_previous_session=False, *, timeout: int = 120, synapse_client: Optional[Synapse] = None, @@ -1462,7 +1462,7 @@ async def create_async( Arguments: attach_to_previous_session: If True and using `record_set_id`, will attach - to an existing active session if one exists. Defaults to True. + to an existing active session if one exists. Defaults to False. timeout: The number of seconds to wait for the job to complete or progress before raising a SynapseTimeoutError. Defaults to 120. synapse_client: If not passed in and caching was not disabled by diff --git a/synapseclient/models/dataset.py b/synapseclient/models/dataset.py index 646b226b2..f3b927546 100644 --- a/synapseclient/models/dataset.py +++ b/synapseclient/models/dataset.py @@ -11,6 +11,7 @@ from synapseclient.api.table_services import ViewEntityType, ViewTypeMask from synapseclient.core.async_utils import async_to_sync from synapseclient.core.constants import concrete_types +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE from synapseclient.core.utils import MB, delete_none_keys from synapseclient.models import Activity, Annotations from synapseclient.models.mixins.access_control import AccessControllable @@ -24,11 +25,7 @@ ViewStoreMixin, ViewUpdateMixin, ) -from synapseclient.models.table_components import ( - DATA_FRAME_TYPE, - Column, - TableUpdateTransaction, -) +from synapseclient.models.table_components import Column, TableUpdateTransaction if TYPE_CHECKING: from synapseclient.models import File, Folder diff --git a/synapseclient/models/entityview.py b/synapseclient/models/entityview.py index 0463dbe57..bbc0ae006 100644 --- a/synapseclient/models/entityview.py +++ b/synapseclient/models/entityview.py @@ -3,7 +3,7 @@ from copy import deepcopy from dataclasses import dataclass, field from datetime import date, datetime -from typing import Any, Dict, List, Optional, Protocol, Set, TypeVar, Union +from typing import Any, Dict, List, Optional, Protocol, Set, Union from typing_extensions import Self @@ -11,6 +11,7 @@ from synapseclient.api import ViewTypeMask from synapseclient.core.async_utils import async_to_sync from synapseclient.core.constants import concrete_types +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE from synapseclient.core.utils import MB, delete_none_keys from synapseclient.models import Activity, Annotations from synapseclient.models.mixins import AccessControllable, BaseJSONSchema @@ -27,8 +28,6 @@ ) from synapseclient.models.table_components import Column -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") - class EntityViewSynchronousProtocol(Protocol): def store( diff --git a/synapseclient/models/mixins/json_schema.py b/synapseclient/models/mixins/json_schema.py index 8d04ea4f5..4fd9db0a1 100644 --- a/synapseclient/models/mixins/json_schema.py +++ b/synapseclient/models/mixins/json_schema.py @@ -53,6 +53,11 @@ class JSONSchemaVersionInfo: created_by: str """The Synapse user ID of the creator of the schema version.""" + @property + def json_schema_uri(self) -> str: + """The JSON schema URI constructed from organization name, schema name, and semantic version.""" + return f"{self.organization_name}-{self.schema_name}-{self.semantic_version}" + @dataclass class JSONSchemaBinding: diff --git a/synapseclient/models/mixins/table_components.py b/synapseclient/models/mixins/table_components.py index eb094b722..dc583975b 100644 --- a/synapseclient/models/mixins/table_components.py +++ b/synapseclient/models/mixins/table_components.py @@ -11,7 +11,7 @@ from dataclasses import dataclass, field from datetime import datetime from io import BytesIO -from typing import Any, Dict, List, Optional, Protocol, Tuple, TypeVar, Union +from typing import Any, Dict, List, Optional, Protocol, Tuple, Union from tqdm import tqdm from tqdm.contrib.logging import logging_redirect_tqdm @@ -35,6 +35,8 @@ ensure_download_location_is_directory, ) from synapseclient.core.exceptions import SynapseTimeoutError +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE +from synapseclient.core.typing_utils import Series as SERIES_TYPE from synapseclient.core.upload.multipart_upload_async import ( multipart_upload_dataframe_async, multipart_upload_file_async, @@ -45,6 +47,7 @@ extract_synapse_id_from_query, log_dataclass_diff, merge_dataclass_entities, + test_import_pandas, ) from synapseclient.models import Activity from synapseclient.models.services.search import get_id @@ -108,9 +111,6 @@ "ROW_HASH_CODE", ] -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") -SERIES_TYPE = TypeVar("pd.Series") - LIST_COLUMN_TYPES = { "STRING_LIST", "INTEGER_LIST", @@ -121,25 +121,6 @@ } -def test_import_pandas() -> None: - """This function is called within other functions and methods to ensure that pandas is installed.""" - try: - import pandas as pd # noqa F401 - # used to catch when pandas isn't installed - except ModuleNotFoundError: - raise ModuleNotFoundError( - """\n\nThe pandas package is required for this function!\n - Most functions in the synapseclient package don't require the - installation of pandas, but some do. Please refer to the installation - instructions at: http://pandas.pydata.org/ or - https://python-docs.synapse.org/tutorials/installation/#installation-guide-for-pypi-users. - \n\n\n""" - ) - # catch other errors (see SYNPY-177) - except: # noqa - raise - - def row_labels_from_id_and_version(rows): return ["_".join(map(str, row)) for row in rows] diff --git a/synapseclient/models/recordset.py b/synapseclient/models/recordset.py index ec8e5dd13..87bb877ef 100644 --- a/synapseclient/models/recordset.py +++ b/synapseclient/models/recordset.py @@ -15,11 +15,17 @@ from synapseclient.core import utils from synapseclient.core.async_utils import async_to_sync from synapseclient.core.constants import concrete_types +from synapseclient.core.download import download_by_file_handle +from synapseclient.core.download.download_functions import ( + ensure_download_location_is_directory, +) from synapseclient.core.exceptions import SynapseFileNotFoundError +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE from synapseclient.core.utils import ( delete_none_keys, guess_file_name, merge_dataclass_entities, + test_import_pandas, ) from synapseclient.models import Activity, Annotations from synapseclient.models.mixins import AccessControllable, BaseJSONSchema @@ -277,6 +283,79 @@ def delete( """ return None + def get_detailed_validation_results( + self, + download_location: Optional[str] = None, + *, + synapse_client: Optional[Synapse] = None, + ) -> DATA_FRAME_TYPE: + """ + Get detailed validation results for the RecordSet as a pandas DataFrame. + + This method downloads a CSV file containing detailed validation results for each row + in the RecordSet. The validation results are generated when a RecordSet with a bound + JSON schema is exported from a Grid session. The CSV contains columns: + - row_index: The index of the row in the RecordSet + - is_valid: Boolean indicating if the row is valid according to the schema + - validation_error_message: The primary validation error message (if any) + - all_validation_messages: All validation messages for the row (if any) + + Arguments: + download_location: Optional directory path where the validation results CSV + should be downloaded. If not specified, the file will be downloaded to + the Synapse cache directory. If the file is already cached, it will use + the cached version unless a different download_location is specified. + synapse_client: If not passed in and caching was not disabled by + `Synapse.allow_client_caching(False)`, this will use the last created + instance from the Synapse class constructor. + + Returns: + A pandas DataFrame containing the validation results, or None if no + validation_file_handle_id is available (with a warning logged). + + Example: Get validation results for a RecordSet + Get detailed validation results after exporting from a Grid session: + + ```python + from synapseclient import Synapse + from synapseclient.models import RecordSet, Grid + + syn = Synapse() + syn.login() + + # Assuming you have a RecordSet with a bound schema + record_set = RecordSet(id="syn123").get() + + # Create and export Grid session to generate validation results + grid = Grid(record_set_id=record_set.id).create() + grid.export_to_record_set() + grid.delete() + + # Re-fetch the RecordSet to get updated validation_file_handle_id + record_set = record_set.get() + + # Get the detailed validation results + results_df = record_set.get_detailed_validation_results() + + # Analyze the results + print(f"Total rows: {len(results_df)}") + print(f"Columns: {results_df.columns.tolist()}") + + # Filter for valid and invalid rows + # Note: is_valid is boolean (True/False) for validated rows + valid_rows = results_df[results_df['is_valid'] == True] # noqa: E712 + invalid_rows = results_df[results_df['is_valid'] == False] # noqa: E712 + + print(f"Valid rows: {len(valid_rows)}") + print(f"Invalid rows: {len(invalid_rows)}") + + # View invalid rows with their error messages + if len(invalid_rows) > 0: + print(invalid_rows[['row_index', 'validation_error_message']]) + ``` + """ + return None + @dataclass() @async_to_sync @@ -329,6 +408,11 @@ class RecordSet(RecordSetSynchronousProtocol, AccessControllable, BaseJSONSchema file_name_override: An optional replacement for the name of the uploaded file. This is distinct from the entity name. If omitted the file will retain its original name. + validation_file_handle_id: (Read Only) Pointer to a CSV file that contains the + detailed validation results for each row in the record set. The CSV file + will contain for each row the following columns: row_index, is_valid, + validation_error_message, all_validation_messages. Generated only from a + grid session export, cannot be changed by the user. content_type: (New Upload Only) Used to manually specify Content-type header, for example 'application/png' or 'application/json; charset=UTF-8'. If not specified, the content type will be derived from the file extension. @@ -502,6 +586,14 @@ class RecordSet(RecordSetSynchronousProtocol, AccessControllable, BaseJSONSchema the entity name. If omitted the file will retain its original name. """ + validation_file_handle_id: Optional[str] = None + """ + (Read Only) Pointer to a CSV file that contains the detailed validation results for + each row in the record set. The CSV file will contain for each row the following + columns: row_index, is_valid, validation_error_message, all_validation_messages. + Generated only from a grid session export, cannot be changed by the user. + """ + content_type: Optional[str] = None """ (New Upload Only) @@ -735,6 +827,7 @@ def fill_from_dict( self.data_file_handle_id = entity.get("dataFileHandleId", None) self.path = entity.get("path", self.path) self.file_name_override = entity.get("fileNameOverride", None) + self.validation_file_handle_id = entity.get("validationFileHandleId", None) csv_descriptor = entity.get("csvDescriptor", None) if csv_descriptor: from synapseclient.models import CsvTableDescriptor @@ -1207,6 +1300,125 @@ async def main(): ) return self + async def get_detailed_validation_results_async( + self, + download_location: Optional[str] = None, + *, + synapse_client: Optional[Synapse] = None, + ) -> DATA_FRAME_TYPE: + """ + Get detailed validation results for the RecordSet as a pandas DataFrame. + + This method downloads a CSV file containing detailed validation results for each row + in the RecordSet. The validation results are generated when a RecordSet with a bound + JSON schema is exported from a Grid session. The CSV contains columns: + - row_index: The index of the row in the RecordSet + - is_valid: Boolean indicating if the row is valid according to the schema + - validation_error_message: The primary validation error message (if any) + - all_validation_messages: All validation messages for the row (if any) + + Arguments: + download_location: Optional directory path where the validation results CSV + should be downloaded. If not specified, the file will be downloaded to + the Synapse cache directory. If the file is already cached, it will use + the cached version unless a different download_location is specified. + synapse_client: If not passed in and caching was not disabled by + `Synapse.allow_client_caching(False)`, this will use the last created + instance from the Synapse class constructor. + + Returns: + A pandas DataFrame containing the validation results, or None if no + validation_file_handle_id is available (with a warning logged). + + Example: Get validation results for a RecordSet + Get detailed validation results after exporting from a Grid session: + + ```python + import asyncio + from synapseclient import Synapse + from synapseclient.models import RecordSet, Grid + + async def main(): + syn = Synapse() + syn.login() + + # Assuming you have a RecordSet with a bound schema + record_set = await RecordSet(id="syn123").get_async() + + # Create and export Grid session to generate validation results + grid = await Grid(record_set_id=record_set.id).create_async() + await grid.export_to_record_set_async() + await grid.delete_async() + + # Re-fetch the RecordSet to get updated validation_file_handle_id + record_set = await record_set.get_async() + + # Get the detailed validation results + results_df = await record_set.get_detailed_validation_results_async() + + # Analyze the results + print(f"Total rows: {len(results_df)}") + print(f"Columns: {results_df.columns.tolist()}") + + # Filter for valid and invalid rows + # Note: is_valid is boolean (True/False) for validated rows + valid_rows = results_df[results_df['is_valid'] == True] # noqa: E712 + invalid_rows = results_df[results_df['is_valid'] == False] # noqa: E712 + + print(f"Valid rows: {len(valid_rows)}") + print(f"Invalid rows: {len(invalid_rows)}") + + # View invalid rows with their error messages + if len(invalid_rows) > 0: + print(invalid_rows[['row_index', 'validation_error_message']]) + + asyncio.run(main()) + ``` + """ + test_import_pandas() + import pandas as pd + + client = Synapse.get_client(synapse_client=synapse_client) + + if not self.validation_file_handle_id: + client.logger.warning( + "No validation file handle ID found for this RecordSet. Cannot retrieve detailed validation results." + ) + return None + + cached_file_path = client.cache.get( + file_handle_id=self.validation_file_handle_id, path=download_location + ) + + # location in .synapseCache where the file would be corresponding to its FileHandleId + synapse_cache_location = client.cache.get_cache_dir( + file_handle_id=self.validation_file_handle_id + ) + + if download_location is not None: + # Make sure the specified download location is a fully resolved directory + download_location = ensure_download_location_is_directory(download_location) + elif cached_file_path is not None: + # file already cached so use that as the download location + download_location = os.path.dirname(cached_file_path) + else: + # file not cached and no user-specified location so default to .synapseCache + download_location = synapse_cache_location + + # Generate filename for the validation results CSV + filename = f"SYNAPSE_RECORDSET_VALIDATION_{self.validation_file_handle_id}.csv" + destination_path = os.path.join(download_location, filename) + + validation_file_path = await download_by_file_handle( + file_handle_id=self.validation_file_handle_id, + synapse_id=self.id, + entity_type="FileEntity", + destination=destination_path, + synapse_client=client, + ) + + return pd.read_csv(validation_file_path) + async def delete_async( self, version_only: Optional[bool] = False, diff --git a/synapseclient/models/submissionview.py b/synapseclient/models/submissionview.py index 1de1f2d7e..4647f489d 100644 --- a/synapseclient/models/submissionview.py +++ b/synapseclient/models/submissionview.py @@ -3,7 +3,7 @@ from copy import deepcopy from dataclasses import dataclass, field from datetime import date, datetime -from typing import Dict, List, Optional, Protocol, TypeVar, Union +from typing import Dict, List, Optional, Protocol, Union from typing_extensions import Self @@ -26,8 +26,6 @@ ) from synapseclient.models.table_components import Column -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") - class SubmissionViewSynchronousProtocol(Protocol): """Protocol defining the synchronous interface for SubmissionView operations.""" diff --git a/synapseclient/models/table.py b/synapseclient/models/table.py index 6aa73c435..cae9cce42 100644 --- a/synapseclient/models/table.py +++ b/synapseclient/models/table.py @@ -3,7 +3,7 @@ from copy import deepcopy from dataclasses import dataclass, field from datetime import date, datetime -from typing import Any, Dict, List, Optional, Protocol, TypeVar, Union +from typing import Any, Dict, List, Optional, Protocol, Union from typing_extensions import Self @@ -12,7 +12,8 @@ from synapseclient.api import create_table_snapshot from synapseclient.core.async_utils import async_to_sync from synapseclient.core.constants import concrete_types -from synapseclient.core.utils import MB, delete_none_keys +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE +from synapseclient.core.utils import MB, delete_none_keys, test_import_pandas from synapseclient.models import Activity, Annotations from synapseclient.models.mixins import AccessControllable, BaseJSONSchema from synapseclient.models.mixins.table_components import ( @@ -34,8 +35,6 @@ ) from synapseclient.models.table_components import Column -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") - class TableSynchronousProtocol(Protocol): def store( @@ -838,6 +837,7 @@ def delete_rows( Table(id="syn1234").delete_rows(query="SELECT ROW_ID, ROW_VERSION FROM syn1234 WHERE foo is null") ``` """ + test_import_pandas() from pandas import DataFrame return DataFrame() diff --git a/synapseclient/models/table_components.py b/synapseclient/models/table_components.py index 28b7e7233..fdc68356f 100644 --- a/synapseclient/models/table_components.py +++ b/synapseclient/models/table_components.py @@ -2,16 +2,7 @@ import os from dataclasses import dataclass, field, replace from enum import Enum -from typing import ( - TYPE_CHECKING, - Any, - AsyncGenerator, - Dict, - List, - Optional, - TypeVar, - Union, -) +from typing import TYPE_CHECKING, Any, AsyncGenerator, Dict, List, Optional, Union from typing_extensions import Self @@ -24,6 +15,7 @@ QUERY_TABLE_CSV_REQUEST, QUERY_TABLE_CSV_RESULT, ) +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE from synapseclient.core.utils import delete_none_keys, from_unix_epoch_time from synapseclient.models.mixins.asynchronous_job import AsynchronousCommunicator from synapseclient.models.protocols.table_protocol import ColumnSynchronousProtocol @@ -31,8 +23,6 @@ if TYPE_CHECKING: from synapseclient import Synapse -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") - @dataclass class SumFileSizes: diff --git a/synapseclient/table.py b/synapseclient/table.py index 6b8bb79c0..685070838 100644 --- a/synapseclient/table.py +++ b/synapseclient/table.py @@ -34,7 +34,7 @@ import re import tempfile from builtins import zip -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from synapseclient.core.constants import concrete_types from synapseclient.core.exceptions import SynapseError @@ -44,6 +44,7 @@ from_unix_epoch_time, id_of, itersubclasses, + test_import_pandas, ) from .entity import Entity, Folder, Project, entity_type_to_class @@ -52,6 +53,8 @@ if TYPE_CHECKING: from synapseclient import Synapse +from synapseclient.core.typing_utils import DataFrame as DataFrameType + aggregate_pattern = re.compile(r"(count|max|min|avg|sum)\((.+)\)") # default is STRING, only need to put the non-STRING keys in here @@ -83,8 +86,6 @@ DEFAULT_SEPARATOR = "," DEFAULT_ESCAPSE_CHAR = "\\" -DataFrameType = TypeVar("pd.DataFrame") - # This Enum is used to help users determine which Entity types they want in their view # Each item will be used to construct the viewTypeMask @@ -130,23 +131,6 @@ def _get_view_type_mask_for_deprecated_type(type): raise ValueError("The provided value is not a valid type: %s", type) -def test_import_pandas(): - try: - import pandas as pd # noqa F401 - # used to catch when pandas isn't installed - except ModuleNotFoundError: - raise ModuleNotFoundError( - """\n\nThe pandas package is required for this function!\n - Most functions in the synapseclient package don't require the - installation of pandas, but some do. Please refer to the installation - instructions at: http://pandas.pydata.org/. - \n\n\n""" - ) - # catch other errors (see SYNPY-177) - except: # noqa - raise - - def as_table_columns(values: Union[str, DataFrameType]): """ Return a list of Synapse table [Column][synapseclient.table.Column] objects diff --git a/synapseutils/describe_functions.py b/synapseutils/describe_functions.py index d54808271..422ec77a9 100644 --- a/synapseutils/describe_functions.py +++ b/synapseutils/describe_functions.py @@ -4,7 +4,8 @@ from collections import defaultdict import synapseclient -from synapseclient import Synapse, table +from synapseclient import Synapse +from synapseclient.core.utils import test_import_pandas def _open_entity_as_df(syn: Synapse, entity: str): @@ -19,7 +20,7 @@ def _open_entity_as_df(syn: Synapse, entity: str): A [Pandas DataFrame](http://pandas.pydata.org/pandas-docs/stable/api.html#dataframe) if flow of execution is successful; None if not. """ - table.test_import_pandas() + test_import_pandas() import pandas as pd dataset = None @@ -51,7 +52,7 @@ def _describe_wrapper(df, syn: Synapse) -> dict: Returns: See param mode """ - table.test_import_pandas() + test_import_pandas() import pandas as pd stats = defaultdict(dict) diff --git a/synapseutils/sync.py b/synapseutils/sync.py index 28ab13189..6a6c21631 100644 --- a/synapseutils/sync.py +++ b/synapseutils/sync.py @@ -9,16 +9,7 @@ import re import sys from dataclasses import dataclass -from typing import ( - TYPE_CHECKING, - Dict, - Iterable, - List, - NamedTuple, - Tuple, - TypeVar, - Union, -) +from typing import TYPE_CHECKING, Dict, Iterable, List, NamedTuple, Tuple, Union from deprecated import deprecated from tqdm import tqdm @@ -26,7 +17,7 @@ from synapseclient import File as SynapseFile from synapseclient import Folder as SynapseFolder from synapseclient import Project as SynapseProject -from synapseclient import Synapse, table +from synapseclient import Synapse from synapseclient.api import get_entity, get_entity_id_bundle2 from synapseclient.core import utils from synapseclient.core.async_utils import wrap_async_to_sync @@ -39,6 +30,7 @@ SynapseProvenanceError, ) from synapseclient.core.transfer_bar import shared_download_progress_bar +from synapseclient.core.typing_utils import DataFrame as DATA_FRAME_TYPE from synapseclient.core.upload.multipart_upload_async import ( shared_progress_bar as upload_shared_progress_bar, ) @@ -49,6 +41,7 @@ id_of, is_synapse_id_str, is_url, + test_import_pandas, ) from synapseclient.entity import is_container from synapseclient.models import Activity, File, UsedEntity, UsedURL @@ -58,9 +51,6 @@ if TYPE_CHECKING: from synapseclient.models import Folder, Project - -DATA_FRAME_TYPE = TypeVar("pd.DataFrame") - # When new fields are added to the manifest they will also need to be added to # file.py#_determine_fields_to_ignore_in_merge REQUIRED_FIELDS = ["path", "parent"] @@ -1055,7 +1045,7 @@ async def readManifestFile_async(syn: Synapse, manifestFile: str) -> DATA_FRAME_ Returns: A pandas dataframe if the manifest is validated. """ - table.test_import_pandas() + test_import_pandas() import pandas as pd if manifestFile is sys.stdin: diff --git a/tests/integration/synapseclient/models/async/test_grid_async.py b/tests/integration/synapseclient/models/async/test_grid_async.py index 5cac8e674..cd16a0cf0 100644 --- a/tests/integration/synapseclient/models/async/test_grid_async.py +++ b/tests/integration/synapseclient/models/async/test_grid_async.py @@ -122,7 +122,9 @@ async def test_create_grid_session_and_reuse_session_async( # WHEN: Creating a second grid session (should reuse the existing one) created_grid2 = await grid2.create_async( - timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn + timeout=ASYNC_JOB_TIMEOUT_SEC, + synapse_client=self.syn, + attach_to_previous_session=True, ) # THEN: The same session should be reused diff --git a/tests/integration/synapseclient/models/async/test_recordset_async.py b/tests/integration/synapseclient/models/async/test_recordset_async.py index 8505fbb53..3fa342742 100644 --- a/tests/integration/synapseclient/models/async/test_recordset_async.py +++ b/tests/integration/synapseclient/models/async/test_recordset_async.py @@ -1,10 +1,12 @@ """Integration tests for the synapseclient.models.RecordSet class (async).""" +import asyncio import os import tempfile import uuid -from typing import Callable +from typing import Callable, Generator, Tuple +import pandas as pd import pytest from synapseclient import Synapse @@ -18,6 +20,8 @@ UsedEntity, UsedURL, ) +from synapseclient.models.curation import Grid +from synapseclient.services.json_schema import JsonSchemaOrganization class TestRecordSetStoreAsync: @@ -359,3 +363,377 @@ async def test_delete_validation_errors_async(self) -> None: await record_set_with_id.delete_async( version_only=True, synapse_client=self.syn ) + + +class TestRecordSetGetDetailedValidationResultsAsync: + """Tests for the RecordSet.get_detailed_validation_results_async method.""" + + @pytest.fixture(autouse=True, scope="function") + def init(self, syn: Synapse, schedule_for_cleanup: Callable[..., None]) -> None: + self.syn = syn + self.schedule_for_cleanup = schedule_for_cleanup + + @pytest.fixture(scope="function") + def create_test_schema( + self, syn: Synapse + ) -> Generator[Tuple[JsonSchemaOrganization, str, list], None, None]: + """Create a test JSON schema for RecordSet validation.""" + org_name = "recordsettest" + uuid.uuid4().hex[:6] + schema_name = "recordset.validation.schema" + + js = syn.service("json_schema") + created_org = js.create_organization(org_name) + record_set_ids = [] # Track RecordSets that need schema unbinding + + try: + # Define a schema with comprehensive validation rules to test different error types: + # 1. Required fields (id, name) + # 2. Type constraints (integer, string, number, boolean) + # 3. String constraints (minLength) + # 4. Numeric constraints (minimum, maximum) + # 5. Enum constraints (category must be A, B, C, or D) + schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": f"https://example.com/schema/{schema_name}.json", + "title": "RecordSet Validation Schema", + "type": "object", + "properties": { + "id": { + "description": "The unique identifier", + "type": "integer", + }, + "name": { + "description": "Name of the record (min 3 characters)", + "type": "string", + "minLength": 3, + }, + "value": { + "description": "Numeric value (must be >= 0 and <= 1000)", + "type": "number", + "minimum": 0, + "maximum": 1000, + }, + "category": { + "description": "Category classification (A, B, C, or D only)", + "type": "string", + "enum": ["A", "B", "C", "D"], + }, + "active": { + "description": "Active status flag", + "type": "boolean", + }, + }, + "required": ["id", "name"], + } + + test_org = js.JsonSchemaOrganization(org_name) + created_schema = test_org.create_json_schema(schema, schema_name, "0.0.1") + yield test_org, created_schema.uri, record_set_ids + finally: + # Unbind schema from any RecordSets before deleting + for record_set_id in record_set_ids: + try: + record_set = RecordSet(id=record_set_id) + record_set.unbind_schema(synapse_client=syn) + except Exception: + pass # Ignore errors if already unbound or deleted + + try: + js.delete_json_schema(created_schema.uri) + except Exception: + pass # Ignore if schema can't be deleted + + try: + js.delete_organization(created_org["id"]) + except Exception: + pass # Ignore if org can't be deleted + + @pytest.fixture(scope="function") + async def record_set_with_validation_fixture( + self, + project_model: Project, + create_test_schema: Tuple[JsonSchemaOrganization, str, list], + ) -> RecordSet: + """Create and store a RecordSet with schema bound, then export via Grid to generate validation results.""" + from tests.integration import ASYNC_JOB_TIMEOUT_SEC + + _, schema_uri, record_set_ids = create_test_schema + + # Create test data with multiple types of validation errors: + # Row 1: VALID - all fields correct + # Row 2: VALID - all fields correct + # Row 3: INVALID - missing required 'name' field (None value) + # Row 4: INVALID - multiple violations: + # - name too short ("AB" < minLength of 3) + # - value exceeds maximum (1500 > 1000) + # - category not in enum ("X" not in [A, B, C, D]) + # Row 5: INVALID - value below minimum (-50 < 0) + test_data = pd.DataFrame( + { + "id": [1, 2, 3, 4, 5], + "name": [ + "Alpha", + "Beta", + None, + "AB", + "Epsilon", + ], # Row 3: None, Row 4: too short + "value": [ + 10.5, + 150.3, + 30.7, + 1500.0, + -50.0, + ], # Row 4: too high, Row 5: negative + "category": ["A", "B", "A", "X", "B"], # Row 4: invalid enum value + "active": [True, False, True, True, False], + } + ) + + # Create a temporary CSV file + temp_fd, filename = tempfile.mkstemp(suffix=".csv") + try: + os.close(temp_fd) # Close the file descriptor + test_data.to_csv(filename, index=False) + self.schedule_for_cleanup(filename) + + # Create and store the RecordSet + record_set = RecordSet( + path=filename, + name=str(uuid.uuid4()), + description="Test RecordSet for validation testing", + version_comment="Validation test version", + version_label=str(uuid.uuid4()), + upsert_keys=["id", "name"], + ) + + stored_record_set = await record_set.store_async( + parent=project_model, synapse_client=self.syn + ) + self.schedule_for_cleanup(stored_record_set.id) + record_set_ids.append(stored_record_set.id) # Track for schema cleanup + + await asyncio.sleep(10) + + # Bind the JSON schema to the RecordSet + await stored_record_set.bind_schema_async( + json_schema_uri=schema_uri, + enable_derived_annotations=False, + synapse_client=self.syn, + ) + + # Verify the schema is bound by getting the schema from the entity + await stored_record_set.get_schema_async(synapse_client=self.syn) + + # Wait for schema binding to be fully processed by backend + await asyncio.sleep(10) + + # Create a Grid session from the RecordSet + grid = Grid(record_set_id=stored_record_set.id) + created_grid = await grid.create_async( + timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn + ) + + await asyncio.sleep(10) + + # Export the Grid back to RecordSet to generate validation results + exported_grid = await created_grid.export_to_record_set_async( + timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn + ) + + # Clean up the Grid session + await exported_grid.delete_async(synapse_client=self.syn) + + # Re-fetch the RecordSet to get the updated validation_file_handle_id + updated_record_set = await RecordSet(id=stored_record_set.id).get_async( + synapse_client=self.syn + ) + + return updated_record_set + except Exception: + # Clean up the temp file if something goes wrong + if os.path.exists(filename): + os.unlink(filename) + raise + + async def test_get_validation_results_no_file_handle_id_async( + self, project_model: Project + ) -> None: + # GIVEN a RecordSet without a validation_file_handle_id + filename = utils.make_bogus_uuid_file() + self.schedule_for_cleanup(filename) + + record_set = await RecordSet( + name=str(uuid.uuid4()), + path=filename, + description="RecordSet without validation", + parent_id=project_model.id, + upsert_keys=["id"], + ).store_async(synapse_client=self.syn) + self.schedule_for_cleanup(record_set.id) + + # WHEN I try to get detailed validation results + result = await record_set.get_detailed_validation_results_async( + synapse_client=self.syn + ) + + # THEN it should return None and log a warning + assert result is None + assert record_set.validation_file_handle_id is None + + async def test_get_validation_results_with_default_location_async( + self, record_set_with_validation_fixture: RecordSet + ) -> None: + # GIVEN a RecordSet with validation results + record_set = record_set_with_validation_fixture + + # WHEN I get detailed validation results without specifying a location + results_df = await record_set.get_detailed_validation_results_async( + synapse_client=self.syn + ) + + # THEN it should return a pandas DataFrame + assert results_df is not None + assert isinstance(results_df, pd.DataFrame) + # The validation results file should be downloaded to the cache + assert record_set.validation_file_handle_id is not None + + # AND the DataFrame should contain expected columns for validation results + expected_columns = [ + "row_index", + "is_valid", + "validation_error_message", + "all_validation_messages", + ] + for col in expected_columns: + assert ( + col in results_df.columns + ), f"Expected column '{col}' not found in validation results" + + # AND there should be 5 rows (one per data row) + assert ( + len(results_df) == 5 + ), f"Expected 5 rows in validation results, got {len(results_df)}" + + # AND rows 0 and 1 should be valid (is_valid == True) + assert ( + results_df.loc[0, "is_valid"] == True + ), "Row 0 should be valid" # noqa: E712 + assert ( + results_df.loc[1, "is_valid"] == True + ), "Row 1 should be valid" # noqa: E712 + assert pd.isna( + results_df.loc[0, "validation_error_message"] + ), "Row 0 should have no error message" + assert pd.isna( + results_df.loc[1, "validation_error_message"] + ), "Row 1 should have no error message" + + # AND row 2 should be invalid (missing required 'name' field) + assert ( + results_df.loc[2, "is_valid"] == False + ), "Row 2 should be invalid (missing required name)" # noqa: E712 + assert ( + "expected type: String, found: Null" + in results_df.loc[2, "validation_error_message"] + ), f"Row 2 should have null type error, got: {results_df.loc[2, 'validation_error_message']}" + assert "#/name: expected type: String, found: Null" in str( + results_df.loc[2, "all_validation_messages"] + ), f"Row 2 all_validation_messages incorrect: {results_df.loc[2, 'all_validation_messages']}" + + # AND row 3 should be invalid (multiple violations: minLength, maximum, enum) + assert ( + results_df.loc[3, "is_valid"] == False + ), "Row 3 should be invalid (multiple violations)" # noqa: E712 + assert ( + "3 schema violations found" in results_df.loc[3, "validation_error_message"] + ), f"Row 3 should have 3 violations, got: {results_df.loc[3, 'validation_error_message']}" + all_msgs_3 = str(results_df.loc[3, "all_validation_messages"]) + assert ( + "#/name: expected minLength: 3, actual: 2" in all_msgs_3 + ), f"Row 3 should have minLength violation: {all_msgs_3}" + assert ( + "#/value: 1500 is not less or equal to 1000" in all_msgs_3 + or "1500" in all_msgs_3 + ), f"Row 3 should have maximum violation: {all_msgs_3}" + assert ( + "#/category: X is not a valid enum value" in all_msgs_3 + or "enum" in all_msgs_3.lower() + ), f"Row 3 should have enum violation: {all_msgs_3}" + + # AND row 4 should be invalid (value below minimum) + assert ( + results_df.loc[4, "is_valid"] == False + ), "Row 4 should be invalid (value below minimum)" # noqa: E712 + assert ( + "-50 is not greater or equal to 0" + in results_df.loc[4, "validation_error_message"] + ), f"Row 4 should have minimum violation, got: {results_df.loc[4, 'validation_error_message']}" + assert "#/value: -50 is not greater or equal to 0" in str( + results_df.loc[4, "all_validation_messages"] + ), f"Row 4 all_validation_messages incorrect: {results_df.loc[4, 'all_validation_messages']}" + + async def test_get_validation_results_with_custom_location_async( + self, record_set_with_validation_fixture: RecordSet + ) -> None: + # GIVEN a RecordSet with validation results + record_set = record_set_with_validation_fixture + + # AND a custom download location + custom_location = tempfile.mkdtemp() + self.schedule_for_cleanup(custom_location) + + # WHEN I get detailed validation results with a custom location + results_df = await record_set.get_detailed_validation_results_async( + download_location=custom_location, synapse_client=self.syn + ) + + # THEN it should return a pandas DataFrame + assert results_df is not None + assert isinstance(results_df, pd.DataFrame) + + # AND the file should be downloaded to the custom location + expected_filename = ( + f"SYNAPSE_RECORDSET_VALIDATION_{record_set.validation_file_handle_id}.csv" + ) + expected_path = os.path.join(custom_location, expected_filename) + assert os.path.exists(expected_path) + + # AND the DataFrame should contain validation result columns + assert "row_index" in results_df.columns + assert "is_valid" in results_df.columns + assert "validation_error_message" in results_df.columns + assert "all_validation_messages" in results_df.columns + + # Expected behavior: 3 invalid rows (rows 2, 3, 4) and 2 valid rows (rows 0, 1) + invalid_rows = results_df[results_df["is_valid"] == False] # noqa: E712 + assert ( + len(invalid_rows) == 3 + ), f"Expected 3 invalid rows, got {len(invalid_rows)}" + + valid_rows = results_df[results_df["is_valid"] == True] # noqa: E712 + assert len(valid_rows) == 2, f"Expected 2 valid rows, got {len(valid_rows)}" + + # All invalid rows should have validation error messages + for idx, row in invalid_rows.iterrows(): + assert pd.notna( + row["validation_error_message"] + ), f"Row {idx} is marked invalid but has no validation_error_message" + assert pd.notna( + row["all_validation_messages"] + ), f"Row {idx} is marked invalid but has no all_validation_messages" + + async def test_get_validation_results_validation_error_async(self) -> None: + # GIVEN a RecordSet without an ID + record_set = RecordSet() + + # Note: The method doesn't have explicit validation for missing ID, + # but it will fail when trying to download without a valid entity + # This test documents the expected behavior + # The method requires validation_file_handle_id to be set to work properly + result = await record_set.get_detailed_validation_results_async( + synapse_client=self.syn + ) + + # THEN it should return None since there's no validation_file_handle_id + assert result is None diff --git a/tests/integration/synapseclient/models/synchronous/test_grid.py b/tests/integration/synapseclient/models/synchronous/test_grid.py index 019e25a3b..bc52f5e83 100644 --- a/tests/integration/synapseclient/models/synchronous/test_grid.py +++ b/tests/integration/synapseclient/models/synchronous/test_grid.py @@ -112,7 +112,9 @@ def test_create_grid_session_and_reuse_session( grid2 = Grid(record_set_id=record_set_fixture.id) # WHEN: Creating a second grid session (should reuse the existing one) - created_grid2 = grid2.create(synapse_client=self.syn) + created_grid2 = grid2.create( + synapse_client=self.syn, attach_to_previous_session=True + ) # THEN: The same session should be reused assert created_grid2.session_id == first_session_id diff --git a/tests/integration/synapseclient/models/synchronous/test_recordset.py b/tests/integration/synapseclient/models/synchronous/test_recordset.py index 10337ec95..52247d360 100644 --- a/tests/integration/synapseclient/models/synchronous/test_recordset.py +++ b/tests/integration/synapseclient/models/synchronous/test_recordset.py @@ -1,8 +1,12 @@ """Integration tests for the synapseclient.models.RecordSet class.""" +import os +import tempfile +import time import uuid -from typing import Callable +from typing import Callable, Generator, Tuple +import pandas as pd import pytest from synapseclient import Synapse @@ -16,6 +20,8 @@ UsedEntity, UsedURL, ) +from synapseclient.models.curation import Grid +from synapseclient.services.json_schema import JsonSchemaOrganization class TestRecordSetStore: @@ -345,3 +351,380 @@ def test_delete_validation_errors(self) -> None: record_set_with_id = RecordSet(id="syn123456") with pytest.raises(ValueError): record_set_with_id.delete(version_only=True, synapse_client=self.syn) + + +class TestRecordSetGetDetailedValidationResults: + """Tests for the RecordSet.get_detailed_validation_results method.""" + + @pytest.fixture(autouse=True, scope="function") + def init(self, syn: Synapse, schedule_for_cleanup: Callable[..., None]) -> None: + self.syn = syn + self.schedule_for_cleanup = schedule_for_cleanup + + @pytest.fixture(scope="function") + def create_test_schema( + self, syn: Synapse + ) -> Generator[Tuple[JsonSchemaOrganization, str, list], None, None]: + """Create a test JSON schema for RecordSet validation.""" + org_name = "recordsettest" + uuid.uuid4().hex[:6] + schema_name = "recordset.validation.schema" + + js = syn.service("json_schema") + created_org = js.create_organization(org_name) + record_set_ids = [] # Track RecordSets that need schema unbinding + + try: + # Define a schema with comprehensive validation rules to test different error types: + # 1. Required fields (id, name) + # 2. Type constraints (integer, string, number, boolean) + # 3. String constraints (minLength) + # 4. Numeric constraints (minimum, maximum) + # 5. Enum constraints (category must be A, B, C, or D) + schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": f"https://example.com/schema/{schema_name}.json", + "title": "RecordSet Validation Schema", + "type": "object", + "properties": { + "id": { + "description": "The unique identifier", + "type": "integer", + }, + "name": { + "description": "Name of the record (min 3 characters)", + "type": "string", + "minLength": 3, + }, + "value": { + "description": "Numeric value (must be >= 0 and <= 1000)", + "type": "number", + "minimum": 0, + "maximum": 1000, + }, + "category": { + "description": "Category classification (A, B, C, or D only)", + "type": "string", + "enum": ["A", "B", "C", "D"], + }, + "active": { + "description": "Active status flag", + "type": "boolean", + }, + }, + "required": ["id", "name"], + } + + test_org = js.JsonSchemaOrganization(org_name) + created_schema = test_org.create_json_schema(schema, schema_name, "0.0.1") + yield test_org, created_schema.uri, record_set_ids + finally: + # Unbind schema from any RecordSets before deleting + for record_set_id in record_set_ids: + try: + record_set = RecordSet(id=record_set_id) + record_set.unbind_schema(synapse_client=syn) + except Exception: + pass # Ignore errors if already unbound or deleted + + try: + js.delete_json_schema(created_schema.uri) + except Exception: + pass # Ignore if schema can't be deleted + + try: + js.delete_organization(created_org["id"]) + except Exception: + pass # Ignore if org can't be deleted + + @pytest.fixture(scope="function") + def record_set_with_validation_fixture( + self, + project_model: Project, + create_test_schema: Tuple[JsonSchemaOrganization, str, list], + ) -> RecordSet: + """Create and store a RecordSet with schema bound, then export via Grid to generate validation results.""" + from tests.integration import ASYNC_JOB_TIMEOUT_SEC + + _, schema_uri, record_set_ids = create_test_schema + + # Create test data with multiple types of validation errors: + # Row 1: VALID - all fields correct + # Row 2: VALID - all fields correct + # Row 3: INVALID - missing required 'name' field (None value) + # Row 4: INVALID - multiple violations: + # - name too short ("AB" < minLength of 3) + # - value exceeds maximum (1500 > 1000) + # - category not in enum ("X" not in [A, B, C, D]) + # Row 5: INVALID - value below minimum (-50 < 0) + test_data = pd.DataFrame( + { + "id": [1, 2, 3, 4, 5], + "name": [ + "Alpha", + "Beta", + None, + "AB", + "Epsilon", + ], # Row 3: None, Row 4: too short + "value": [ + 10.5, + 150.3, + 30.7, + 1500.0, + -50.0, + ], # Row 4: too high, Row 5: negative + "category": ["A", "B", "A", "X", "B"], # Row 4: invalid enum value + "active": [True, False, True, True, False], + } + ) + + # Create a temporary CSV file + temp_fd, filename = tempfile.mkstemp(suffix=".csv") + try: + os.close(temp_fd) # Close the file descriptor + test_data.to_csv(filename, index=False) + self.schedule_for_cleanup(filename) + + # Create and store the RecordSet + record_set = RecordSet( + path=filename, + name=str(uuid.uuid4()), + description="Test RecordSet for validation testing", + version_comment="Validation test version", + version_label=str(uuid.uuid4()), + upsert_keys=["id", "name"], + ) + + stored_record_set = record_set.store( + parent=project_model, synapse_client=self.syn + ) + self.schedule_for_cleanup(stored_record_set.id) + record_set_ids.append(stored_record_set.id) # Track for schema cleanup + + time.sleep(10) + + # Bind the JSON schema to the RecordSet + stored_record_set.bind_schema( + json_schema_uri=schema_uri, + enable_derived_annotations=False, + synapse_client=self.syn, + ) + + time.sleep(10) + + # Verify the schema is bound by getting the schema from the entity + stored_record_set.get_schema(synapse_client=self.syn) + + # Create a Grid session from the RecordSet + grid = Grid(record_set_id=stored_record_set.id) + created_grid = grid.create( + timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn + ) + + time.sleep(10) + + # Export the Grid back to RecordSet to generate validation results + exported_grid = created_grid.export_to_record_set( + timeout=ASYNC_JOB_TIMEOUT_SEC, synapse_client=self.syn + ) + + # Clean up the Grid session + exported_grid.delete(synapse_client=self.syn) + + # Re-fetch the RecordSet to get the updated validation_file_handle_id + updated_record_set = RecordSet(id=stored_record_set.id).get( + synapse_client=self.syn + ) + + return updated_record_set + except Exception: + # Clean up the temp file if something goes wrong + if os.path.exists(filename): + os.unlink(filename) + raise + + def test_get_validation_results_no_file_handle_id( + self, project_model: Project + ) -> None: + # GIVEN a RecordSet without a validation_file_handle_id + filename = utils.make_bogus_uuid_file() + self.schedule_for_cleanup(filename) + + record_set = RecordSet( + name=str(uuid.uuid4()), + path=filename, + description="RecordSet without validation", + parent_id=project_model.id, + upsert_keys=["id"], + ).store(synapse_client=self.syn) + self.schedule_for_cleanup(record_set.id) + + # WHEN I try to get detailed validation results + result = record_set.get_detailed_validation_results(synapse_client=self.syn) + + # THEN it should return None and log a warning + assert result is None + assert record_set.validation_file_handle_id is None + + def test_get_validation_results_with_default_location( + self, record_set_with_validation_fixture: RecordSet + ) -> None: + # GIVEN a RecordSet with validation results + record_set = record_set_with_validation_fixture + + # WHEN I get detailed validation results without specifying a location + results_df = record_set.get_detailed_validation_results(synapse_client=self.syn) + + # THEN it should return a pandas DataFrame + assert results_df is not None + assert isinstance(results_df, pd.DataFrame) + # The validation results file should be downloaded to the cache + assert record_set.validation_file_handle_id is not None + + # AND the DataFrame should contain expected columns for validation results + expected_columns = [ + "row_index", + "is_valid", + "validation_error_message", + "all_validation_messages", + ] + for col in expected_columns: + assert ( + col in results_df.columns + ), f"Expected column '{col}' not found in validation results" + + # AND there should be 5 rows (one per data row) + assert ( + len(results_df) == 5 + ), f"Expected 5 rows in validation results, got {len(results_df)}" + + # Debug: Print actual validation results to diagnose the issue + print("\n=== Debug: Validation Results ===") + print(f"DataFrame shape: {results_df.shape}") + print(f"DataFrame dtypes:\n{results_df.dtypes}") + print("\nValidation results:") + print(results_df.to_string()) + print(f"\nis_valid column unique values: {results_df['is_valid'].unique()}") + print(f"is_valid column dtype: {results_df['is_valid'].dtype}") + print("=== End Debug ===") + + # AND rows 0 and 1 should be valid (is_valid == True) + assert ( + results_df.loc[0, "is_valid"] == True + ), "Row 0 should be valid" # noqa: E712 + assert ( + results_df.loc[1, "is_valid"] == True + ), "Row 1 should be valid" # noqa: E712 + assert pd.isna( + results_df.loc[0, "validation_error_message"] + ), "Row 0 should have no error message" + assert pd.isna( + results_df.loc[1, "validation_error_message"] + ), "Row 1 should have no error message" + + # AND row 2 should be invalid (missing required 'name' field) + assert ( + results_df.loc[2, "is_valid"] == False + ), "Row 2 should be invalid (missing required name)" # noqa: E712 + assert ( + "expected type: String, found: Null" + in results_df.loc[2, "validation_error_message"] + ), f"Row 2 should have null type error, got: {results_df.loc[2, 'validation_error_message']}" + assert "#/name: expected type: String, found: Null" in str( + results_df.loc[2, "all_validation_messages"] + ), f"Row 2 all_validation_messages incorrect: {results_df.loc[2, 'all_validation_messages']}" + + # AND row 3 should be invalid (multiple violations: minLength, maximum, enum) + assert ( + results_df.loc[3, "is_valid"] == False + ), "Row 3 should be invalid (multiple violations)" # noqa: E712 + assert ( + "3 schema violations found" in results_df.loc[3, "validation_error_message"] + ), f"Row 3 should have 3 violations, got: {results_df.loc[3, 'validation_error_message']}" + all_msgs_3 = str(results_df.loc[3, "all_validation_messages"]) + assert ( + "#/name: expected minLength: 3, actual: 2" in all_msgs_3 + ), f"Row 3 should have minLength violation: {all_msgs_3}" + assert ( + "#/value: 1500 is not less or equal to 1000" in all_msgs_3 + or "1500" in all_msgs_3 + ), f"Row 3 should have maximum violation: {all_msgs_3}" + assert ( + "#/category: X is not a valid enum value" in all_msgs_3 + or "enum" in all_msgs_3.lower() + ), f"Row 3 should have enum violation: {all_msgs_3}" + + # AND row 4 should be invalid (value below minimum) + assert ( + results_df.loc[4, "is_valid"] == False + ), "Row 4 should be invalid (value below minimum)" # noqa: E712 + assert ( + "-50 is not greater or equal to 0" + in results_df.loc[4, "validation_error_message"] + ), f"Row 4 should have minimum violation, got: {results_df.loc[4, 'validation_error_message']}" + assert "#/value: -50 is not greater or equal to 0" in str( + results_df.loc[4, "all_validation_messages"] + ), f"Row 4 all_validation_messages incorrect: {results_df.loc[4, 'all_validation_messages']}" + + def test_get_validation_results_with_custom_location( + self, record_set_with_validation_fixture: RecordSet + ) -> None: + # GIVEN a RecordSet with validation results + record_set = record_set_with_validation_fixture + + # AND a custom download location + custom_location = tempfile.mkdtemp() + self.schedule_for_cleanup(custom_location) + + # WHEN I get detailed validation results with a custom location + results_df = record_set.get_detailed_validation_results( + download_location=custom_location, synapse_client=self.syn + ) + + # THEN it should return a pandas DataFrame + assert results_df is not None + assert isinstance(results_df, pd.DataFrame) + + # AND the file should be downloaded to the custom location + expected_filename = ( + f"SYNAPSE_RECORDSET_VALIDATION_{record_set.validation_file_handle_id}.csv" + ) + expected_path = os.path.join(custom_location, expected_filename) + assert os.path.exists(expected_path) + + # AND the DataFrame should contain validation result columns + assert "row_index" in results_df.columns + assert "is_valid" in results_df.columns + assert "validation_error_message" in results_df.columns + assert "all_validation_messages" in results_df.columns + + # Expected behavior: 3 invalid rows (rows 2, 3, 4) and 2 valid rows (rows 0, 1) + invalid_rows = results_df[results_df["is_valid"] == False] # noqa: E712 + assert ( + len(invalid_rows) == 3 + ), f"Expected 3 invalid rows, got {len(invalid_rows)}" + + valid_rows = results_df[results_df["is_valid"] == True] # noqa: E712 + assert len(valid_rows) == 2, f"Expected 2 valid rows, got {len(valid_rows)}" + + # All invalid rows should have validation error messages + for idx, row in invalid_rows.iterrows(): + assert pd.notna( + row["validation_error_message"] + ), f"Row {idx} is marked invalid but has no validation_error_message" + assert pd.notna( + row["all_validation_messages"] + ), f"Row {idx} is marked invalid but has no all_validation_messages" + + def test_get_validation_results_validation_error(self) -> None: + # GIVEN a RecordSet without an ID + record_set = RecordSet() + + # Note: The method doesn't have explicit validation for missing ID, + # but it will fail when trying to download without a valid entity + # This test documents the expected behavior + # The method requires validation_file_handle_id to be set to work properly + result = record_set.get_detailed_validation_results(synapse_client=self.syn) + + # THEN it should return None since there's no validation_file_handle_id + assert result is None