diff --git a/igf_airflow/utils/dag49_cosmx_metadata_registration_utils.py b/igf_airflow/utils/dag49_cosmx_metadata_registration_utils.py new file mode 100644 index 00000000..31d44dd6 --- /dev/null +++ b/igf_airflow/utils/dag49_cosmx_metadata_registration_utils.py @@ -0,0 +1,110 @@ +import logging +from datetime import timedelta +from airflow.models import Variable +from airflow.decorators import task +from airflow.operators.python import get_current_context +from igf_airflow.utils.generic_airflow_utils import ( + send_airflow_failed_logs_to_channels +) +from igf_data.process.seqrun_processing.unified_metadata_registration import ( + UnifiedMetadataRegistration +) + +log = logging.getLogger(__name__) + +MS_TEAMS_CONF = Variable.get( + 'analysis_ms_teams_conf', + default_var=None +) +DATABASE_CONFIG_FILE = Variable.get( + 'database_config_file', + default_var=None +) +IGF_PORTAL_CONF = Variable.get( + 'igf_portal_conf', + default_var=None +) +FETCH_METADATA_URL_SUFFIX = Variable.get( + 'cosmx_metadata_fetch_metadata_url', + default_var='/api/v1/raw_cosmx_metadata/get_raw_metadata/' +) +SYNC_METADATA_URL_SUFFIX = Variable.get( + 'cosmx_metadata_sync_metadata_url', + default_var='/api/v1/raw_cosmx_metadata/mark_ready_metadata_as_synced' +) +METADATA_VALIDATION_SCHEMA = Variable.get( + 'igf_portal_conf', + default_var=None +) +DEFAULT_EMAIL = Variable.get( + 'default_email', + default_var=None +) + +## TASK - find raw metadata id in datrun.conf +@task( + task_id="find_raw_metadata_id", + retry_delay=timedelta(minutes=5), + retries=4, + queue='hpc_4G' +) +def find_raw_metadata_id( + raw_cosmx_metadata_id_tag: str = "raw_cosmx_metadata_id", + dag_run_key: str = "dag_run" +) -> int: + try: + ### dag_run.conf should have raw_cosmx_metadata_id + context = get_current_context() + dag_run = context.get(dag_run_key) + raw_cosmx_metadata_id = None + if ( + dag_run is not None + and dag_run.conf is not None + and dag_run.conf.get(raw_cosmx_metadata_id_tag) is not None + ): + raw_cosmx_metadata_id = ( + dag_run.conf + .get(raw_cosmx_metadata_id_tag) + ) + if raw_cosmx_metadata_id is None: + raise ValueError( + 'raw_analysis_id not found in dag_run.conf' + ) + return int(raw_cosmx_metadata_id) + except Exception as e: + message = f"Failed to get raw_analysis_id, error: {e}" + log.error(message) + send_airflow_failed_logs_to_channels( + ms_teams_conf=MS_TEAMS_CONF, + message_prefix=str(message)) + raise ValueError(message) + +@task( + task_id="register_cosmx_metadata", + retry_delay=timedelta(minutes=5), + retries=4, + queue='hpc_4G' +) +def register_cosmx_metadata( + raw_cosmx_metadata_id: int +) -> None: + try: + metadata_registration = UnifiedMetadataRegistration( + raw_cosmx_metadata_id=raw_cosmx_metadata_id, + portal_config_file=IGF_PORTAL_CONF, + fetch_metadata_url_suffix=FETCH_METADATA_URL_SUFFIX, + sync_metadata_url_suffix=SYNC_METADATA_URL_SUFFIX, + metadata_validation_schema=METADATA_VALIDATION_SCHEMA, + db_config_file=DATABASE_CONFIG_FILE, + default_project_user_email=DEFAULT_EMAIL, + samples_required=False + ) + metadata_registration.execute() + except Exception as e: + message = f"Failed to register new cosmx metadata, error: {e}" + log.error(message) + send_airflow_failed_logs_to_channels( + ms_teams_conf=MS_TEAMS_CONF, + message_prefix=str(message) + ) + raise ValueError(message) \ No newline at end of file diff --git a/igf_data/process/seqrun_processing/unified_metadata_registration.py b/igf_data/process/seqrun_processing/unified_metadata_registration.py index d58dcb82..4603a545 100644 --- a/igf_data/process/seqrun_processing/unified_metadata_registration.py +++ b/igf_data/process/seqrun_processing/unified_metadata_registration.py @@ -37,6 +37,7 @@ def _get_db_session_class(db_config_file: str) -> Any: class MetadataContext: def __init__( self, + raw_cosmx_metadata_id: int, portal_config_file: str, fetch_metadata_url_suffix: str, sync_metadata_url_suffix: str, @@ -57,6 +58,7 @@ def __init__( "sample": ["sample_igf_id", "project_igf_id"]}, error_list: List[str] = []) \ -> None: + self.raw_cosmx_metadata_id = raw_cosmx_metadata_id self.portal_config_file = portal_config_file self.fetch_metadata_url_suffix = fetch_metadata_url_suffix self.sync_metadata_url_suffix = sync_metadata_url_suffix @@ -90,9 +92,13 @@ def execute(self, metadata_context: MetadataContext) -> None: try: portal_config_file = metadata_context.portal_config_file fetch_metadata_url_suffix = metadata_context.fetch_metadata_url_suffix + fetch_url = urljoin( + fetch_metadata_url_suffix, + str(metadata_context.raw_cosmx_metadata_id) + ) new_project_data_dict = \ get_data_from_portal( - url_suffix=fetch_metadata_url_suffix, + url_suffix=fetch_url, portal_config_file=portal_config_file) if len(new_project_data_dict) > 0: reformatted_project_data_dict = \ @@ -619,6 +625,7 @@ def execute(self, metadata_context: MetadataContext) -> None: class UnifiedMetadataRegistration: def __init__( self, + raw_cosmx_metadata_id: int, portal_config_file: str, fetch_metadata_url_suffix: str, sync_metadata_url_suffix: str, @@ -628,6 +635,7 @@ def __init__( samples_required: bool = False, ) -> None: self.metadata_context = MetadataContext( + raw_cosmx_metadata_id=raw_cosmx_metadata_id, portal_config_file=portal_config_file, fetch_metadata_url_suffix=fetch_metadata_url_suffix, sync_metadata_url_suffix=sync_metadata_url_suffix, diff --git a/test/__init__.py b/test/__init__.py index 2e9f144e..90730911 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -225,6 +225,9 @@ def full_suite(): Test_dag46_scRNA_10X_flex_utilsA) from .igf_airflow.test_dag44_analysis_registration_utils import ( Test_dag44_analysis_registration_utilsA) + from .igf_airflow.test_dag49_cosmx_metadata_registration_utils import ( + Test_dag49_cosmx_metadata_registration_utilsA + ) return unittest.TestSuite([ @@ -372,6 +375,7 @@ def full_suite(): TestUnifiedMetadataRegistrationB, Test_dag46_scRNA_10X_flex_utilsA, Test_dag45_metadata_registration_utilsA, - Test_dag44_analysis_registration_utilsA + Test_dag44_analysis_registration_utilsA, + Test_dag49_cosmx_metadata_registration_utilsA ] ]) \ No newline at end of file diff --git a/test/igf_airflow/test_dag49_cosmx_metadata_registration_utils.py b/test/igf_airflow/test_dag49_cosmx_metadata_registration_utils.py new file mode 100644 index 00000000..03bc970c --- /dev/null +++ b/test/igf_airflow/test_dag49_cosmx_metadata_registration_utils.py @@ -0,0 +1,44 @@ +import unittest +from igf_data.utils.fileutils import ( + get_temp_dir, + remove_dir +) +from unittest.mock import patch, MagicMock +from igf_airflow.utils.dag49_cosmx_metadata_registration_utils import ( + find_raw_metadata_id, + register_cosmx_metadata +) + +class Test_dag49_cosmx_metadata_registration_utilsA(unittest.TestCase): + def setUp(self): + self.temp_dir = get_temp_dir() + + def tearDown(self): + remove_dir(self.temp_dir) + + @patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.get_current_context") + def test_find_raw_metadata_id(self, mock_get_context): + mock_context = MagicMock() + mock_context.dag_run.conf.raw_cosmx_metadata_id = 1 + mock_context.get.return_value = mock_context.dag_run + mock_context.dag_run.conf.get.return_value = 1 + mock_get_context.return_value = mock_context + raw_cosmx_metadata_id = find_raw_metadata_id.function() + assert raw_cosmx_metadata_id == 1 + + @patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.DATABASE_CONFIG_FILE", "test.conf") + @patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.IGF_PORTAL_CONF", "test.conf") + @patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.METADATA_VALIDATION_SCHEMA", "test.json") + @patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.DEFAULT_EMAIL", "c@c.org") + @patch("igf_airflow.utils.dag49_cosmx_metadata_registration_utils.UnifiedMetadataRegistration") + def test_register_cosmx_metadata(self, mock_class, *args): + mock_instance = MagicMock() + mock_instance.execute.return_value = None + mock_class.return_value = mock_instance + register_cosmx_metadata.function( + raw_cosmx_metadata_id=1 + ) + mock_instance.execute.assert_called_once() + +if __name__=='__main__': + unittest.main() \ No newline at end of file diff --git a/test/process/unified_metadata_registration_test.py b/test/process/unified_metadata_registration_test.py index 00d777a3..43b47fe4 100644 --- a/test/process/unified_metadata_registration_test.py +++ b/test/process/unified_metadata_registration_test.py @@ -27,6 +27,7 @@ class TestUnifiedMetadataRegistrationA(unittest.TestCase): def setUp(self): + self.raw_cosmx_metadata_id = 1 self.portal_config_file = "test_config.json" self.fetch_metadata_url_suffix = "test_fetch_suffix" self.sync_metadata_url_suffix = "test_sync_suffix" @@ -79,6 +80,7 @@ def tearDown(self): def test_MetadataContext(self): metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -93,6 +95,7 @@ def test_FetchNewMetadataCommand1(self, *args): fetch_command = FetchNewMetadataCommand() self.assertIsInstance(fetch_command, FetchNewMetadataCommand) metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -109,6 +112,7 @@ def test_FetchNewMetadataCommand2(self, *args): fetch_command = FetchNewMetadataCommand() self.assertIsInstance(fetch_command, FetchNewMetadataCommand) metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -163,6 +167,7 @@ def test_CheckRawMetadataColumnsCommand_check_columns(self): def test_CheckRawMetadataColumnsCommand_execute(self): metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -187,6 +192,7 @@ def test_CheckRawMetadataColumnsCommand_execute(self): self.assertEqual(metadata_context.checked_required_column_dict.get(2), False) self.assertEqual(len(metadata_context.error_list), 1) metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -245,6 +251,7 @@ def test_ValidateMetadataCommand_validate_metadata(self): def test_ValidateMetadataCommand_execute(self): validate = ValidateMetadataCommand() metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -275,6 +282,7 @@ def test_ValidateMetadataCommand_execute(self): def test_CheckAndRegisterMetadataCommand_split_metadata(self): metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -489,6 +497,7 @@ def test_CheckAndRegisterMetadataCommand_execute(self): check_and_register = \ CheckAndRegisterMetadataCommand() metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -567,6 +576,7 @@ def test_CheckAndRegisterMetadataCommand_execute(self): @patch("igf_data.process.seqrun_processing.unified_metadata_registration.get_data_from_portal", return_value=True) def test_SyncMetadataCommand_execute(self, *args): metadata_context = MetadataContext( + raw_cosmx_metadata_id=self.raw_cosmx_metadata_id, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix, @@ -623,6 +633,7 @@ def tearDown(self): 3: [{"project_igf_id": "IGFA003", "deliverable": "COSMX", "name": "User DQ", "email_id": "a-c.com", "username": "ddd"}]}) def test_UnifiedMetadataRegistration_execute(self, *args): metadata_registration = UnifiedMetadataRegistration( + raw_cosmx_metadata_id=1, portal_config_file=self.portal_config_file, fetch_metadata_url_suffix=self.fetch_metadata_url_suffix, sync_metadata_url_suffix=self.sync_metadata_url_suffix,