diff --git a/igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py b/igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py index 9ebfd74c3..f81b3c2e5 100644 --- a/igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py +++ b/igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py @@ -31,7 +31,6 @@ log = logging.getLogger(__name__) -SLACK_CONF = Variable.get('analysis_slack_conf',default_var=None) MS_TEAMS_CONF = Variable.get('analysis_ms_teams_conf',default_var=None) HPC_SSH_KEY_FILE = Variable.get('hpc_ssh_key_file', default_var=None) DATABASE_CONFIG_FILE = Variable.get('database_config_file', default_var=None) @@ -112,7 +111,6 @@ def get_analysis_group_list( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) @@ -137,7 +135,7 @@ def prepare_cellranger_script(sample_group: str, design_dict: dict) -> dict: analysis_metadata is None: raise KeyError("Missing sample or analysis metadata") work_dir = get_temp_dir(use_ephemeral_space=True) - library_csv_file, run_script_file = \ + _, run_script_file = \ prepare_cellranger_run_dir_and_script_file( sample_group=str(sample_group), work_dir=work_dir, @@ -153,7 +151,6 @@ def prepare_cellranger_script(sample_group: str, design_dict: dict) -> dict: except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) @@ -165,7 +162,9 @@ def prepare_cellranger_run_dir_and_script_file( design_file: str, db_config_file: str, run_script_template: str, - library_csv_filename: str = 'library.csv') \ + library_csv_filename: str = 'library.csv', + cellranger_group_tag: str = 'cellranger_group', + feature_types_tag: str = 'feature_types') \ -> str: try: check_file_path(design_file) @@ -178,29 +177,34 @@ def prepare_cellranger_run_dir_and_script_file( input_design_yaml=input_design_yaml) if sample_metadata is None or \ analysis_metadata is None: - raise KeyError("Missing sample or analysis metadata") + raise KeyError( + "Missing sample or analysis metadata") ## library info sample_library_list = \ create_library_information_for_sample_group( sample_group=sample_group, sample_metadata=sample_metadata, - db_config_file=db_config_file) + db_config_file=db_config_file, + cellranger_group_tag=cellranger_group_tag, + feature_types_tag=feature_types_tag) ## get cellranger conf cellranger_multi_config = \ - analysis_metadata.get("cellranger_multi_config") + analysis_metadata.get( + "cellranger_multi_config") if cellranger_multi_config is None: - raise KeyError("Missing cellranger_multi_config in analysis design") + raise KeyError( + "Missing cellranger_multi_config in analysis design") ## create temp dir and dump script and library.csv library_csv_file = \ os.path.join( work_dir, library_csv_filename) - sample_library_csv = \ - pd.DataFrame(sample_library_list).\ - to_csv(index=False) + sample_library_csv = ( + pd.DataFrame(sample_library_list) + .to_csv(index=False)) with open(library_csv_file, 'w') as fp: fp.write('\n'.join(cellranger_multi_config)) - fp.write('\n') ## add an empty line + fp.write('\n') fp.write('[libraries]\n') fp.write(sample_library_csv) ## create run script from template @@ -208,8 +212,6 @@ def prepare_cellranger_run_dir_and_script_file( os.path.join( work_dir, os.path.basename(run_script_template)) - # output_dir = \ - # os.path.join(work_dir, str(sample_group)) _create_output_from_jinja_template( template_file=run_script_template, output_file=script_file, @@ -228,23 +230,28 @@ def prepare_cellranger_run_dir_and_script_file( def create_library_information_for_sample_group( sample_group: str, sample_metadata: dict, - db_config_file: str) -> list: + db_config_file: str, + cellranger_group_tag: str = 'cellranger_group', + feature_types_tag: str = 'feature_types') -> list: try: ## get cellranger group sample_group_dict = dict() sample_igf_id_list = list() for sample_igf_id, group in sample_metadata.items(): - grp_name = group.get('cellranger_group') - feature_types = group.get('feature_types') + grp_name = group.get(cellranger_group_tag) + feature_types = group.get(feature_types_tag) if grp_name is None or feature_types is None: raise KeyError( - "Missing cellranger_group or feature_types in sample_metadata ") + "Missing cellranger_group or feature_types " + \ + "in sample_metadata") if str(grp_name) == str(sample_group): sample_igf_id_list.append(sample_igf_id) - sample_group_dict.update({ sample_igf_id: feature_types}) + sample_group_dict.update({ + sample_igf_id: feature_types}) ## get sample ids from metadata if len(sample_igf_id_list) == 0: - raise ValueError("No sample id found in the metadata") + raise ValueError( + "No sample id found in the metadata") ## get fastq files for all samples fastq_list = \ get_fastq_and_run_for_samples( @@ -256,20 +263,31 @@ def create_library_information_for_sample_group( ## create libraries section df = pd.DataFrame(fastq_list) sample_library_list = list() - for _, g_data in df.groupby(['sample_igf_id', 'run_igf_id', 'flowcell_id', 'lane_number']): - sample_igf_id = g_data['sample_igf_id'].values[0] - fastq_file_path = g_data['file_path'].values[0] - fastq_dir = os.path.dirname(fastq_file_path) - feature_types = sample_group_dict.get(sample_igf_id) + grp_columns = [ + 'sample_igf_id', + 'run_igf_id', + 'flowcell_id', + 'lane_number'] + for _, g_data in df.groupby(grp_columns): + sample_igf_id = \ + g_data['sample_igf_id'].values[0] + fastq_file_path = \ + g_data['file_path'].values[0] + fastq_dir = \ + os.path.dirname(fastq_file_path) + feature_types = \ + sample_group_dict.get(sample_igf_id) if feature_types is None: raise KeyError( - f"No feature_types found for sample {sample_igf_id}") - fastq_id = \ - os.path.basename(fastq_file_path).split("_")[0] + "No feature_types found for sample " + \ + f"{sample_igf_id}") + fastq_id = ( + os.path.basename(fastq_file_path) + .split("_")[0]) sample_library_list.append({ "fastq_id": fastq_id, "fastqs": fastq_dir, - "feature_types": feature_types}) + feature_types_tag: feature_types}) return sample_library_list except Exception as e: raise ValueError( @@ -299,7 +317,6 @@ def run_cellranger_script( Remove it to continue!""") try: send_airflow_pipeline_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=\ f"Started Cellranger for sample: {sample_group}, script: {run_script}") @@ -313,7 +330,6 @@ def run_cellranger_script( ## check output dir exists check_file_path(output_dir) send_airflow_pipeline_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=\ f"Finished Cellranger for sample: {sample_group}") @@ -321,7 +337,6 @@ def run_cellranger_script( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) @@ -429,7 +444,6 @@ def run_single_sample_scanpy( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) @@ -549,55 +563,11 @@ def move_single_sample_result_to_main_work_dir( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) -# ## TASK -# @task.branch( -# task_id="collect_and_branch", -# retry_delay=timedelta(minutes=5), -# retries=4, -# queue='hpc_4G') -# def collect_and_branch( -# merge_step='configure_cellranger_aggr_run', -# skip_step='calculate_md5sum_for_main_work_dir') \ -# -> list: -# try: -# cellranger_output_dict = dict() -# context = get_current_context() -# ti = context.get('ti') -# all_lazy_task_ids = \ -# context['task'].\ -# get_direct_relative_ids(upstream=True) -# lazy_xcom = ti.xcom_pull(task_ids=all_lazy_task_ids) -# for entry in lazy_xcom: -# sample_group = entry.get("sample_group") -# cellranger_output_dir = entry.get("cellranger_output_dir") -# if sample_group is not None and \ -# cellranger_output_dir is not None: -# ## skipping failed runs -# cellranger_output_dict.update( -# {sample_group: cellranger_output_dir}) -# if len(cellranger_output_dict) == 0: -# raise ValueError(f"No cellranger output found") -# elif len(cellranger_output_dict) == 1: -# return [skip_step] -# else: -# ti.xcom_push( -# key='cellranger_output_dict', -# value=cellranger_output_dict) -# return [merge_step] -# except Exception as e: -# log.error(e) -# send_airflow_failed_logs_to_channels( -# slack_conf=SLACK_CONF, -# ms_teams_conf=MS_TEAMS_CONF, -# message_prefix=e) -# raise ValueError(e) - @task( task_id="configure_cellranger_aggr_run", retry_delay=timedelta(minutes=5), @@ -621,49 +591,10 @@ def configure_cellranger_aggr_run( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) -# ## TASK -# @task( -# task_id="configure_cellranger_aggr_run", -# retry_delay=timedelta(minutes=5), -# retries=4, -# queue='hpc_4G') -# def configure_cellranger_aggr_run( -# xcom_pull_task_ids: str = 'collect_and_branch', -# xcom_pull_task_key: str = 'cellranger_output_dict') \ -# -> dict: -# try: -# cellranger_output_dict = dict() -# context = get_current_context() -# ti = context.get('ti') -# cellranger_output_dict = \ -# ti.xcom_pull( -# task_ids=xcom_pull_task_ids, -# key=xcom_pull_task_key) -# if cellranger_output_dict is None or \ -# (isinstance(cellranger_output_dict, dict) and len(cellranger_output_dict)) == 0: -# raise ValueError(f"No cellranger output found") -# elif len(cellranger_output_dict) == 1: -# raise ValueError(f"Single cellranger output found. Can't merge it!") -# else: -# output_dict = \ -# configure_cellranger_aggr( -# run_script_template=CELLRANGER_AGGR_SCRIPT_TEMPLATE, -# cellranger_output_dict=cellranger_output_dict) -# return output_dict -# except Exception as e: -# log.error(e) -# send_airflow_failed_logs_to_channels( -# slack_conf=SLACK_CONF, -# ms_teams_conf=MS_TEAMS_CONF, -# message_prefix=e) -# raise ValueError(e) - - def configure_cellranger_aggr( run_script_template: str, @@ -722,10 +653,6 @@ def configure_cellranger_aggr( def run_cellranger_aggr_script( script_dict: dict) -> str: try: - # skip_aggr = script_dict.get('skip_aggr') - # if skip_aggr is not None and skip_aggr: - # return {'output_dir': None, 'skip_aggr': True} - sample_name = script_dict.get('sample_name') run_script = script_dict.get('run_script') output_dir = script_dict.get('output_dir') try: @@ -742,7 +669,6 @@ def run_cellranger_aggr_script( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) @@ -758,12 +684,6 @@ def merged_scanpy_report( cellranger_aggr_output_dir: str, design_dict: dict) -> dict: try: - # skip_aggr = \ - # cellranger_aggr_output_dict.get('skip_aggr') - # cellranger_aggr_output_dir = \ - # cellranger_aggr_output_dict.get('output_dir') - # if skip_aggr is not None and skip_aggr: - # return {'skip_aggr': skip_aggr} sample_group = "ALL" design_file = design_dict.get('analysis_design') check_file_path(design_file) @@ -814,21 +734,6 @@ def merged_scanpy_report( 'outs', 'scanpy') os.makedirs(scanpy_dir, exist_ok=True) - ## set count matrix dir - # aggr_filtered_feature_bc_matrix_dir = \ - # os.path.join( - # cellranger_aggr_counts_dir, - # 'filtered_feature_bc_matrix') - # multi_sample_filtered_feature_bc_matrix_dir = \ - # os.path.join( - # cellranger_aggr_counts_dir, - # 'sample_filtered_feature_bc_matrix') - ## create a symlink if the multi style sample_filtered_feature_bc_matrix dir not pesent - # if os.path.exists(aggr_filtered_feature_bc_matrix_dir) and \ - # not os.path.exists(multi_sample_filtered_feature_bc_matrix_dir): - # os.symlink( - # aggr_filtered_feature_bc_matrix_dir, - # multi_sample_filtered_feature_bc_matrix_dir) output_notebook_path, scanpy_h5ad = \ prepare_and_run_scanpy_notebook( project_igf_id=project_igf_id, @@ -861,7 +766,6 @@ def merged_scanpy_report( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) @@ -879,10 +783,6 @@ def move_aggr_result_to_main_work_dir( scanpy_aggr_output_dict: dict ) -> dict: try: - # skip_aggr = \ - # scanpy_aggr_output_dict.get('skip_aggr') - # if skip_aggr is not None and skip_aggr: - # return main_work_dir check_file_path(main_work_dir) cellranger_output_dir = \ scanpy_aggr_output_dict.get("cellranger_output_dir") @@ -904,7 +804,6 @@ def move_aggr_result_to_main_work_dir( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) @@ -947,7 +846,6 @@ def load_cellranger_results_to_db( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) @@ -970,7 +868,6 @@ def decide_aggr( except Exception as e: log.error(e) send_airflow_failed_logs_to_channels( - slack_conf=SLACK_CONF, ms_teams_conf=MS_TEAMS_CONF, message_prefix=e) raise ValueError(e) \ No newline at end of file diff --git a/igf_airflow/utils/dag44_analysis_registration_utils.py b/igf_airflow/utils/dag44_analysis_registration_utils.py index 0c8817c0a..6d45253a5 100644 --- a/igf_airflow/utils/dag44_analysis_registration_utils.py +++ b/igf_airflow/utils/dag44_analysis_registration_utils.py @@ -42,12 +42,11 @@ task_id="find_raw_metadata_id", retry_delay=timedelta(minutes=5), retries=4, - queue='hpc_4G', - multiple_outputs=False) + queue='hpc_4G') def find_raw_metadata_id( raw_analysis_id_tag: str = "raw_analysis_id", dag_run_key: str = "dag_run") \ - -> Dict[str, int]: + -> int: try: ### dag_run.conf should have raw_analysis_id context = get_current_context() @@ -61,7 +60,7 @@ def find_raw_metadata_id( if raw_analysis_id is None: raise ValueError( 'raw_analysis_id not found in dag_run.conf') - return {raw_analysis_id_tag: raw_analysis_id} + return int(raw_analysis_id) except Exception as e: message = \ f"Failed to get raw_analysis_id, error: {e}" @@ -74,12 +73,10 @@ def find_raw_metadata_id( ## TASK - fetch raw analysis metadata from portal @task( task_id="fetch_raw_metadata_from_portal", - retries=0, - queue='hpc_4G', - multiple_outputs=False) + retries=1, + queue='hpc_4G') def fetch_raw_metadata_from_portal( - raw_analysis_id: int, - raw_metadata_file_tag: str = "raw_metadata_file") -> Dict[str, str]: + raw_analysis_id: int) -> str: try: raw_analysis_data = \ get_data_from_portal( @@ -101,7 +98,7 @@ def fetch_raw_metadata_from_portal( os.path.join(temp_dir, "raw_metadata.json") with open(raw_metadata_json_file, "w") as fp: json.dump(raw_analysis_data, fp) - return {raw_metadata_file_tag: raw_metadata_json_file} + return raw_metadata_json_file except Exception as e: message = \ f"Failed to fetch raw_analysis_metadata, error: {e}" @@ -141,12 +138,10 @@ def check_registered_analysis_in_db( task_id="check_raw_metadata_in_db", retry_delay=timedelta(minutes=5), retries=4, - queue='hpc_4G', - multiple_outputs=False) + queue='hpc_4G') def check_raw_metadata_in_db( - raw_metadata_file: str, - valid_raw_metadata_file_tag: str = "valid_raw_metadata_file") \ - -> Dict[str, str]: + raw_metadata_file: str) \ + -> str: try: check_file_path(raw_metadata_file) with open(raw_metadata_file, "r") as fp: @@ -166,10 +161,9 @@ def check_raw_metadata_in_db( project_id=project_id, analysis_name=analysis_name, dbconf_json=DATABASE_CONFIG_FILE) - if analysis_reg: - return {valid_raw_metadata_file_tag: raw_metadata_file} - else: - return {valid_raw_metadata_file_tag: ""} + if not analysis_reg: + raw_metadata_file = "" + return raw_metadata_file except Exception as e: message = \ f"Failed to check existing raw_analysis_metadata, error: {e}" @@ -242,6 +236,21 @@ def register_analysis_in_db( "analysis_name": analysis_name, "analysis_description": analysis_json}] aa.store_analysis_data(data) + ## fetch analysis id + analysis_id = \ + aa.check_analysis_record_by_analysis_name_and_project_id( + analysis_name=analysis_name, + project_id=project_id, + output_mode='one_or_none') + if analysis_id is None: + raise ValueError( + f"No analysis id found for {analysis_name}") + ## create pipeline seed status for analysis id and pipeline id + data = [{ + 'pipeline_id': pipeline_id, + 'seed_id': analysis_id[0], + 'seed_table': 'analysis'}] + pl.create_pipeline_seed(data) aa.close_session() return True except Exception as e: @@ -256,10 +265,10 @@ def register_analysis_in_db( retries=4, queue='hpc_4G', multiple_outputs=False) -def register_raw_analysis_metadata_in_db(valid_raw_metadata_file): +def register_raw_analysis_metadata_in_db(valid_raw_metadata_file: str) -> bool: try: if valid_raw_metadata_file == "": - return {"status": False} + return False else: check_file_path(valid_raw_metadata_file) with open(valid_raw_metadata_file, "r") as fp: @@ -281,7 +290,7 @@ def register_raw_analysis_metadata_in_db(valid_raw_metadata_file): analysis_name=analysis_name, analysis_yaml=analysis_yaml, dbconf_json=DATABASE_CONFIG_FILE) - return {"status": status} + return status except Exception as e: message = \ f"Failed to register raw_analysis_metadata, error: {e}" @@ -296,9 +305,10 @@ def register_raw_analysis_metadata_in_db(valid_raw_metadata_file): task_id="mark_metadata_synced_on_portal", retry_delay=timedelta(minutes=5), retries=4, - queue='hpc_4G', - multiple_outputs=False) -def mark_metadata_synced_on_portal(raw_analysis_id, registration_status): + queue='hpc_4G') +def mark_metadata_synced_on_portal( + raw_analysis_id: int, + registration_status: bool) -> None: try: if registration_status: _ = \ diff --git a/igf_airflow/utils/dag46_scRNA_10X_flex_utils.py b/igf_airflow/utils/dag46_scRNA_10X_flex_utils.py new file mode 100644 index 000000000..ea5184321 --- /dev/null +++ b/igf_airflow/utils/dag46_scRNA_10X_flex_utils.py @@ -0,0 +1,136 @@ +import os +import logging +import pandas as pd +from typing import Any +from datetime import timedelta +from airflow.models import Variable +from airflow.decorators import task +from igf_airflow.utils.generic_airflow_utils import ( + send_airflow_failed_logs_to_channels) +from igf_data.utils.fileutils import ( + check_file_path) +from igf_airflow.utils.dag34_cellranger_multi_scRNA_utils import ( + prepare_cellranger_run_dir_and_script_file, + parse_analysis_design_and_get_metadata) + +log = logging.getLogger(__name__) + +MS_TEAMS_CONF = \ + Variable.get('analysis_ms_teams_conf', default_var=None) +DATABASE_CONFIG_FILE = \ + Variable.get('database_config_file', default_var=None) +HPC_BASE_RAW_DATA_PATH = \ + Variable.get('hpc_base_raw_data_path', default_var=None) +HPC_FILE_LOCATION = \ + Variable.get("hpc_file_location", default_var="HPC_PROJECT") + +## CELLRANGER +CELLRANGER_MULTI_SCRIPT_TEMPLATE = \ + Variable.get("cellranger_multi_script_template", default_var=None) + +def _get_cellranger_sample_group( + sample_metadata: dict[str, Any], + cellranger_group_name: str = 'cellranger_group', + required_tag_name: str = 'feature_types', + required_tag_value: str = 'Gene Expression') \ + -> dict[str]: + try: + unique_sample_groups = set() + required_tag_list = list() + for _, group in sample_metadata.items(): + grp_name = group.get(cellranger_group_name) + if grp_name is None: + raise KeyError( + "Missing cellranger_group in sample_metadata") + unique_sample_groups.add(grp_name) + required_tag_list.append({ + "name": grp_name, + required_tag_name: group.get(required_tag_name)}) + if len(unique_sample_groups) == 0: + raise ValueError("No sample group found") + unique_sample_groups = \ + list(unique_sample_groups) + required_tag_df = \ + pd.DataFrame(required_tag_list) + ## check for required tags + for g in unique_sample_groups: + g_tag_values = ( + required_tag_df[required_tag_df['name']==g][required_tag_name] + .values + .tolist() + ) + if required_tag_value not in g_tag_values: + raise KeyError( + f"No {required_tag_value} found for group {g}") + return unique_sample_groups + except Exception as e: + raise ValueError( + f'Failed to get cellranger sample group, error: {e}') + + +## TASK +@task( + task_id="prepare_cellranger_flex_script", + retry_delay=timedelta(minutes=5), + retries=4, + queue='hpc_4G') +def prepare_cellranger_flex_script( + design_dict: dict, + work_dir: str, + analysis_design_tag: str = "analysis_design", + cellranger_group_name: str = 'cellranger_group', + required_tag_name: str = 'feature_types', + required_tag_value: str = 'Gene Expression') -> dict: + + """ + Create cellranger flex script + """ + try: + ## get sample metadata + design_file = \ + design_dict.get(analysis_design_tag) + check_file_path(design_file) + with open(design_file, 'r') as fp: + input_design_yaml = fp.read() + sample_metadata, analysis_metadata = \ + parse_analysis_design_and_get_metadata( + input_design_yaml=input_design_yaml) + if sample_metadata is None or \ + analysis_metadata is None: + raise KeyError( + "Missing sample or analysis metadata") + ## check if only one sample group is present or not + sample_groups = \ + _get_cellranger_sample_group( + sample_metadata=sample_metadata, + required_tag_name=required_tag_name, + required_tag_value=required_tag_value) + ## check if correct number of sample groups are present + ## reset sample group if more than one groups are present + if len(sample_groups) == 0: + raise ValueError( + "No sample group has been found, file: " + \ + f"{design_file}") + sample_group = \ + sample_groups[0] + _, run_script_file = \ + prepare_cellranger_run_dir_and_script_file( + sample_group=str(sample_group), + work_dir=work_dir, + output_dir=os.path.join(work_dir, str(sample_group)), + design_file=design_file, + db_config_file=DATABASE_CONFIG_FILE, + run_script_template=CELLRANGER_MULTI_SCRIPT_TEMPLATE, + cellranger_group_tag=cellranger_group_name, + feature_types_tag=required_tag_name) + analysis_script_info = { + "sample_group": sample_group, + "run_script": run_script_file, + "output_dir": os.path.join(work_dir, sample_group)} + return analysis_script_info + except Exception as e: + log.error(e) + send_airflow_failed_logs_to_channels( + ms_teams_conf=MS_TEAMS_CONF, + message_prefix=e) + raise ValueError(e) \ No newline at end of file diff --git a/igf_airflow/utils/generic_airflow_tasks.py b/igf_airflow/utils/generic_airflow_tasks.py index 0e731c28d..4e66964bd 100644 --- a/igf_airflow/utils/generic_airflow_tasks.py +++ b/igf_airflow/utils/generic_airflow_tasks.py @@ -219,13 +219,15 @@ def mark_analysis_failed( queue='hpc_4G') def send_email_to_user( send_email: bool = True, - email_user_key: str = 'username') -> None: + email_user_key: str = 'username', + analysis_email_template: str = ANALYSES_EMAIL_CONFIG) -> None: """ An Airflow task for sending email to registered users for updating analysis pipeline status Parameters: send_email (bool): A toggle for sending email to primary user if "True" (default) or fall back to default user if "False" email_user_key (str): Key for the default user as mentioned in the email config file, default is 'username' + analysis_email_template (str): A template for email, default is ANALYSES_EMAIL_CONFIG """ try: ## dag_run.conf should have analysis_id @@ -254,7 +256,7 @@ def send_email_to_user( email_text_file, receivers = \ generate_email_text_for_analysis( analysis_id=analysis_id, - template_path=ANALYSES_EMAIL_CONFIG, + template_path=analysis_email_template, dbconfig_file=DATABASE_CONFIG_FILE, default_email_user=default_email_user, send_email_to_user=send_email) diff --git a/template/geomx/geomx_ngs_pipeline.sh b/template/geomx/geomx_ngs_pipeline.sh index 8bd1b78d1..fefe4c532 100644 --- a/template/geomx/geomx_ngs_pipeline.sh +++ b/template/geomx/geomx_ngs_pipeline.sh @@ -1,6 +1,8 @@ #!/bin/bash set -eo pipefail +eval "$(~/anaconda3/bin/conda shell.bash hook)" +source activate expect cd {{ WORK_DIR }} @@ -17,4 +19,4 @@ script=$(cat << EOF expect "*All done*" EOF ) -/usr/bin/expect -c "$script" \ No newline at end of file +expect -c "$script" \ No newline at end of file diff --git a/test/__init__.py b/test/__init__.py index 7a4d59b1a..39800dafd 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -4,123 +4,193 @@ def get_tests(): return full_suite() def full_suite(): - from .process.basesmask_test import BasesMask_testA - from .process.basesmask_test import BasesMask_testB - from .process.basesmask_test import BasesMask_testC - from .process.basesmask_test import BasesMask_testD - from .process.checksequenceIndexbarcodes_test import CheckSequenceIndexBarcodes_test1 - from .process.collect_seqrun_fastq_to_db_test import Collect_fastq_test1 - from .process.collect_seqrun_fastq_to_db_test import Collect_fastq_test_sc1 - from .dbadaptor.collectionadaptor_test import CollectionAdaptor_test1 - from .dbadaptor.collectionadaptor_test import CollectionAdaptor_test2 - from .dbadaptor.collectionadaptor_test import CollectionAdaptor_test3 - from .utils.dbutils_test import Dbutils_test1 - from .process.find_and_process_new_seqrun_test import Find_seqrun_test1 - from .process.flowcell_rules_test import Flowcell_barcode_rule_test1 - from .process.moveBclFilesForDemultiplexing_test import MiSeqRunInfo - from .process.moveBclFilesForDemultiplexing_test import Hiseq4000RunInfo as Hiseq4000RunInfo_moveBcl - from .dbadaptor.pipelineadaptor_test import Pipelineadaptor_test1 - from .dbadaptor.pipelineadaptor_test import Pipelineadaptor_test2 - from .dbadaptor.pipelineadaptor_test import Pipelineadaptor_test3 - from .dbadaptor.pipelineadaptor_test import Pipelineadaptor_test4 - from .utils.pipelineutils_test import Pipelineutils_test1,Pipelineutils_test2 - from .utils.platformutils_test import Platformutils_test1,Platformutils_test2 - from .dbadaptor.runadaptor_test import RunAdaptor_test1 - from .process.runinfo_xml_test import Hiseq4000RunInfo as Hiseq4000RunInfo_runinfo_xml + from .process.basesmask_test import ( + BasesMask_testA, + BasesMask_testB, + BasesMask_testC, + BasesMask_testD) + from .process.checksequenceIndexbarcodes_test import ( + CheckSequenceIndexBarcodes_test1) + from .process.collect_seqrun_fastq_to_db_test import ( + Collect_fastq_test1, + Collect_fastq_test_sc1) + from .dbadaptor.collectionadaptor_test import ( + CollectionAdaptor_test1, + CollectionAdaptor_test2, + CollectionAdaptor_test3) + from .utils.dbutils_test import ( + Dbutils_test1) + from .process.find_and_process_new_seqrun_test import ( + Find_seqrun_test1) + from .process.flowcell_rules_test import ( + Flowcell_barcode_rule_test1) + from .process.moveBclFilesForDemultiplexing_test import ( + MiSeqRunInfo) + from .process.moveBclFilesForDemultiplexing_test import \ + Hiseq4000RunInfo as Hiseq4000RunInfo_moveBcl + from .dbadaptor.pipelineadaptor_test import ( + Pipelineadaptor_test1, + Pipelineadaptor_test2, + Pipelineadaptor_test3, + Pipelineadaptor_test4) + from .utils.pipelineutils_test import ( + Pipelineutils_test1, + Pipelineutils_test2) + from .utils.platformutils_test import ( + Platformutils_test1, + Platformutils_test2) + from .dbadaptor.runadaptor_test import ( + RunAdaptor_test1) + from .process.runinfo_xml_test import \ + Hiseq4000RunInfo as Hiseq4000RunInfo_runinfo_xml from .process.runparameters_xml_test import Hiseq4000RunParam - from .process.samplesheet_test import Hiseq4000SampleSheet - from .process.samplesheet_test import TestValidateSampleSheet - from .process.samplesheet_test import SampleSheet_format_v2_test1 - from .process.samplesheet_test import TestValidateSampleSheet1 - from .process.samplesheet_test import TestValidateSampleSheet2 - from .process.find_and_register_new_project_data_test import Find_and_register_project_data1 - from .dbadaptor.useradaptor_test import Useradaptor_test1 - from .dbadaptor.sampleadaptor_test import Sampleadaptor_test1 - from .dbadaptor.sampleadaptor_test import Sampleadaptor_test2 - from .dbadaptor.sampleadaptor_test import Sampleadaptor_test3 - from .dbadaptor.sampleadaptor_test import Sampleadaptor_test4 - from .process.processsinglecellsamplesheet_test import ProcessSingleCellSamplesheet_testA - from .process.processsinglecellsamplesheet_test import ProcessSingleCellSamplesheet_testB - from .process.processsinglecellsamplesheet_test import ProcessSingleCellDualIndexSamplesheetA - from .process.mergesinglecellfastq_test import MergeSingleCellFastq_testA - from .process.mergesinglecellfastq_test import MergeSingleCellFastq_testB - from .utils.project_data_display_utils_test import Convert_project_data_gviz_data1 - from .utils.project_data_display_utils_test import Add_seqrun_path_info1 - from .utils.projectutils_test import Projectutils_test1, Projectutils_test3 + from .process.samplesheet_test import ( + Hiseq4000SampleSheet, + TestValidateSampleSheet, + SampleSheet_format_v2_test1, + TestValidateSampleSheet1, + TestValidateSampleSheet2) + from .process.find_and_register_new_project_data_test import ( + Find_and_register_project_data1) + from .dbadaptor.useradaptor_test import ( + Useradaptor_test1) + from .dbadaptor.sampleadaptor_test import ( + Sampleadaptor_test1, + Sampleadaptor_test2, + Sampleadaptor_test3, + Sampleadaptor_test4) + from .process.processsinglecellsamplesheet_test import ( + ProcessSingleCellSamplesheet_testA, + ProcessSingleCellSamplesheet_testB, + ProcessSingleCellDualIndexSamplesheetA) + from .process.mergesinglecellfastq_test import ( + MergeSingleCellFastq_testA, + MergeSingleCellFastq_testB) + from .utils.project_data_display_utils_test import ( + Convert_project_data_gviz_data1, + Add_seqrun_path_info1) + from .utils.projectutils_test import ( + Projectutils_test1, + Projectutils_test3) #from .utils.projectutils_test import Projectutils_test2 - from .dbadaptor.fileadaptor_test import Fileadaptor_test1 - from .process.reset_samplesheet_md5_test import Reset_samplesheet_md5_test1 - from .process.modify_pipeline_seed_test import Modify_pipeline_seed_test1 - from .process.experiment_metadata_updator_test import Experiment_metadata_updator_test - from .dbadaptor.projectadaptor_test import Projectadaptor_test1 - from .dbadaptor.projectadaptor_test import Projectadaptor_test2 - from .utils.analysis_collection_utils_test import Analysis_collection_utils_test1 - from .utils.fileutils_test import Fileutils_test1 - from .utils.pipeseedfactory_utils_test import Pipeseedfactory_utils_test1 - from .utils.reference_genome_utils_test import Reference_genome_utils_test1 - from .dbadaptor.experimentadaptor_test import ExperimentAdaptor_test1 - from .utils.picard_util_test import Picard_util_test1,Picard_util_test2 - from .utils.samtools_utils_test import Samtools_util_test1,Samtools_util_test2 - from .utils.deeptools_utils_test import Deeptools_util_test1 - from .utils.ppqt_utils_test import Ppqt_util_test1 - from .dbadaptor.baseadaptor_test import Baseadaptor_test1 - from .dbadaptor.platformadaptor_test import Platformadaptor_test1 - from .utils.metadata_validation_test import Validate_project_and_samplesheet_metadata_test1 - from .utils.metadata_validation_test import Validate_project_and_samplesheet_metadata_test2 - from .utils.project_analysis_utils_test import Project_analysis_test1 - from .utils.project_analysis_utils_test import Project_analysis_test2 - from .utils.project_analysis_utils_test import Project_analysis_test3 - from .process.project_pooling_info_test import Project_pooling_info_test1 - from .utils.analysis_fastq_fetch_utils_test import Analysis_fastq_fetch_utils_test1 - from .utils.analysis_fastq_fetch_utils_test import Analysis_fastq_fetch_utils_test2 - from .process.reformat_metadata_file_test import Reformat_metadata_file_testA - from .process.reformat_samplesheet_file_test import Reformat_samplesheet_file_testA - from .utils.singularity_run_wrapper_test import Singularity_run_test1 - from .utils.jupyter_nbconvert_wrapper_test import Nbconvert_execute_test1 - from .igf_airflow.calculate_seqrun_file_size_test import Calculate_seqrun_file_list_testA - from .igf_airflow.ongoing_seqrun_processing_test import Compare_existing_seqrun_filesA - from .igf_airflow.dag9_tenx_single_cell_immune_profiling_utils_test import Dag9_tenx_single_cell_immune_profiling_utilstestA - from .igf_airflow.dag9_tenx_single_cell_immune_profiling_utils_test import Dag9_tenx_single_cell_immune_profiling_utilstestB - from .igf_airflow.dag9_tenx_single_cell_immune_profiling_utils_test import Dag9_tenx_single_cell_immune_profiling_utilstestC - from .igf_airflow.dag9_tenx_single_cell_immune_profiling_utils_test import Dag9_tenx_single_cell_immune_profiling_utilstestD - from .igf_airflow.dag9_tenx_single_cell_immune_profiling_utils_test import Dag9_tenx_single_cell_immune_profiling_utilstestE - from .igf_airflow.dag9_tenx_single_cell_immune_profiling_utils_test import Dag9_tenx_single_cell_immune_profiling_utilstestF - from .igf_airflow.dag9_tenx_single_cell_immune_profiling_utils_test import Dag9_tenx_single_cell_immune_profiling_utilstestG - from .igf_airflow.dag10_nextflow_atacseq_pipeline_utils_test import Dag10_nextflow_atacseq_pipeline_utils_testA - from .igf_airflow.dag10_nextflow_atacseq_pipeline_utils_test import Dag10_nextflow_atacseq_pipeline_utils_testB - from .igf_airflow.dag17_create_transcriptome_ref_utils_test import Dag17_create_transcriptome_ref_utils_test_utilstestA - from .igf_airflow.dag18_upload_and_trigger_analysis_utils_test import Dag18_upload_and_trigger_analysis_utils_testA - from .igf_airflow.dag18_upload_and_trigger_analysis_utils_test import Dag18_upload_and_trigger_analysis_utils_testB - from .igf_airflow.dag18_upload_and_trigger_analysis_utils_test import Dag18_upload_and_trigger_analysis_utils_testC - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testA - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testB - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testC - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testD - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testE - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testF - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testG - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testH - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testI - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testJ - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testK - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testL - from .igf_airflow.dag22_bclconvert_demult_utils_test import Dag22_bclconvert_demult_utils_testM - from .utils.cellranger_count_utils_test import Cellranger_count_utils_testA - from .dbadaptor.analysisadaptor_test import Analysisadaptor_test1 - from .dbadaptor.seqrunadaptor_test import SeqrunAdaptor_test1 - from .utils.samplesheet_utils_test import SamplesheetUtils_testA - from .igf_nextflow.nextflow_config_formatter_test import Nextflow_config_formatter_testA - from .igf_nextflow.nextflow_design_test import Nextflow_design_testA - from .igf_nextflow.nextflow_runner_test import Nextflow_pre_run_setup_testA - from .igf_portal.test_metadata_utils import Metadata_dump_test, Metadata_load_test - from .process.find_and_process_new_project_data_from_portal_db_test import Find_and_register_new_project_data_from_portal_db_test1 - from .igf_airflow.dag23_test_bclconvert_demult_utils_test import Dag23_test_bclconvert_demult_utils_testA - from .igf_airflow.dag25_copy_seqruns_to_hpc_utils_test import TestDag25_copy_seqruns_to_hpc_utilsA - from .igf_airflow.dag25_copy_seqruns_to_hpc_utils_test import TestDag25_copy_seqruns_to_hpc_utilsB - from .igf_airflow.dag26_snakemake_rnaseq_utils_test import TestDag26_snakemake_rnaseq_utilsA - from .igf_airflow.dag26_snakemake_rnaseq_utils_test import TestDag26_snakemake_rnaseq_utilsB - from .igf_airflow.dag26_snakemake_rnaseq_utils_test import TestDag26_snakemake_rnaseq_utilsC - from .igf_airflow.dag27_cleanup_demultiplexing_output_utils_test import TestDag27_cleanup_demultiplexing_output_utilsA + from .dbadaptor.fileadaptor_test import ( + Fileadaptor_test1) + from .process.reset_samplesheet_md5_test import ( + Reset_samplesheet_md5_test1) + from .process.modify_pipeline_seed_test import ( + Modify_pipeline_seed_test1) + from .process.experiment_metadata_updator_test import ( + Experiment_metadata_updator_test) + from .dbadaptor.projectadaptor_test import ( + Projectadaptor_test1, + Projectadaptor_test2) + from .utils.analysis_collection_utils_test import ( + Analysis_collection_utils_test1) + from .utils.fileutils_test import ( + Fileutils_test1) + from .utils.pipeseedfactory_utils_test import ( + Pipeseedfactory_utils_test1) + from .utils.reference_genome_utils_test import ( + Reference_genome_utils_test1) + from .dbadaptor.experimentadaptor_test import ( + ExperimentAdaptor_test1) + from .utils.picard_util_test import ( + Picard_util_test1,Picard_util_test2) + from .utils.samtools_utils_test import ( + Samtools_util_test1,Samtools_util_test2) + from .utils.deeptools_utils_test import ( + Deeptools_util_test1) + from .utils.ppqt_utils_test import ( + Ppqt_util_test1) + from .dbadaptor.baseadaptor_test import ( + Baseadaptor_test1) + from .dbadaptor.platformadaptor_test import ( + Platformadaptor_test1) + from .utils.metadata_validation_test import ( + Validate_project_and_samplesheet_metadata_test1, + Validate_project_and_samplesheet_metadata_test2) + from .utils.project_analysis_utils_test import ( + Project_analysis_test1, + Project_analysis_test2, + Project_analysis_test3) + from .process.project_pooling_info_test import ( + Project_pooling_info_test1) + from .utils.analysis_fastq_fetch_utils_test import ( + Analysis_fastq_fetch_utils_test1, + Analysis_fastq_fetch_utils_test2) + from .process.reformat_metadata_file_test import ( + Reformat_metadata_file_testA) + from .process.reformat_samplesheet_file_test import ( + Reformat_samplesheet_file_testA) + from .utils.singularity_run_wrapper_test import ( + Singularity_run_test1) + from .utils.jupyter_nbconvert_wrapper_test import ( + Nbconvert_execute_test1) + from .igf_airflow.calculate_seqrun_file_size_test import ( + Calculate_seqrun_file_list_testA) + from .igf_airflow.ongoing_seqrun_processing_test import ( + Compare_existing_seqrun_filesA) + from .igf_airflow.dag9_tenx_single_cell_immune_profiling_utils_test import ( + Dag9_tenx_single_cell_immune_profiling_utilstestA, + Dag9_tenx_single_cell_immune_profiling_utilstestB, + Dag9_tenx_single_cell_immune_profiling_utilstestC, + Dag9_tenx_single_cell_immune_profiling_utilstestD, + Dag9_tenx_single_cell_immune_profiling_utilstestE, + Dag9_tenx_single_cell_immune_profiling_utilstestF, + Dag9_tenx_single_cell_immune_profiling_utilstestG) + from .igf_airflow.dag10_nextflow_atacseq_pipeline_utils_test import ( + Dag10_nextflow_atacseq_pipeline_utils_testA, + Dag10_nextflow_atacseq_pipeline_utils_testB) + from .igf_airflow.dag17_create_transcriptome_ref_utils_test import ( + Dag17_create_transcriptome_ref_utils_test_utilstestA) + from .igf_airflow.dag18_upload_and_trigger_analysis_utils_test import ( + Dag18_upload_and_trigger_analysis_utils_testA, + Dag18_upload_and_trigger_analysis_utils_testB, + Dag18_upload_and_trigger_analysis_utils_testC) + from .igf_airflow.dag22_bclconvert_demult_utils_test import ( + Dag22_bclconvert_demult_utils_testA, + Dag22_bclconvert_demult_utils_testB, + Dag22_bclconvert_demult_utils_testC, + Dag22_bclconvert_demult_utils_testD, + Dag22_bclconvert_demult_utils_testE, + Dag22_bclconvert_demult_utils_testF, + Dag22_bclconvert_demult_utils_testG, + Dag22_bclconvert_demult_utils_testH, + Dag22_bclconvert_demult_utils_testI, + Dag22_bclconvert_demult_utils_testJ, + Dag22_bclconvert_demult_utils_testK, + Dag22_bclconvert_demult_utils_testL, + Dag22_bclconvert_demult_utils_testM) + from .utils.cellranger_count_utils_test import ( + Cellranger_count_utils_testA) + from .dbadaptor.analysisadaptor_test import ( + Analysisadaptor_test1) + from .dbadaptor.seqrunadaptor_test import ( + SeqrunAdaptor_test1) + from .utils.samplesheet_utils_test import ( + SamplesheetUtils_testA) + from .igf_nextflow.nextflow_config_formatter_test import ( + Nextflow_config_formatter_testA) + from .igf_nextflow.nextflow_design_test import ( + Nextflow_design_testA) + from .igf_nextflow.nextflow_runner_test import ( + Nextflow_pre_run_setup_testA) + from .igf_portal.test_metadata_utils import ( + Metadata_dump_test, + Metadata_load_test) + from .process.find_and_process_new_project_data_from_portal_db_test import ( + Find_and_register_new_project_data_from_portal_db_test1) + from .igf_airflow.dag23_test_bclconvert_demult_utils_test import ( + Dag23_test_bclconvert_demult_utils_testA) + from .igf_airflow.dag25_copy_seqruns_to_hpc_utils_test import ( + TestDag25_copy_seqruns_to_hpc_utilsA, + TestDag25_copy_seqruns_to_hpc_utilsB) + from .igf_airflow.dag26_snakemake_rnaseq_utils_test import ( + TestDag26_snakemake_rnaseq_utilsA, + TestDag26_snakemake_rnaseq_utilsB, + TestDag26_snakemake_rnaseq_utilsC) + from .igf_airflow.dag27_cleanup_demultiplexing_output_utils_test import ( + TestDag27_cleanup_demultiplexing_output_utilsA) from .igf_airflow.test_dag33_geomx_processing_util import ( TestDag33_geomx_processing_util_utilsA, TestDag33_geomx_processing_util_utilsB, @@ -149,6 +219,8 @@ def full_suite(): from .process.unified_metadata_registration_test import ( TestUnifiedMetadataRegistrationA, TestUnifiedMetadataRegistrationB) + from .igf_airflow.test_dag46_scRNA_10X_flex_utils import ( + Test_dag46_scRNA_10X_flex_utilsA) return unittest.TestSuite([ @@ -293,6 +365,7 @@ def full_suite(): TestDag39_project_cleanup_step3_utilsA, Test_dag1_calculate_hpc_worker_utils, TestUnifiedMetadataRegistrationA, - TestUnifiedMetadataRegistrationB + TestUnifiedMetadataRegistrationB, + Test_dag46_scRNA_10X_flex_utilsA ] ]) \ No newline at end of file diff --git a/test/igf_airflow/test_dag34_cellranger_multi_scRNA_utils.py b/test/igf_airflow/test_dag34_cellranger_multi_scRNA_utils.py index 2766c883a..266eb7554 100644 --- a/test/igf_airflow/test_dag34_cellranger_multi_scRNA_utils.py +++ b/test/igf_airflow/test_dag34_cellranger_multi_scRNA_utils.py @@ -1,20 +1,13 @@ import os -import json -import yaml -import zipfile import unittest import pandas as pd from unittest.mock import patch -from yaml import Loader, Dumper from igf_data.igfdb.igfTables import Base from igf_data.igfdb.baseadaptor import BaseAdaptor -from igf_data.igfdb.pipelineadaptor import PipelineAdaptor -from igf_data.igfdb.analysisadaptor import AnalysisAdaptor from igf_data.igfdb.projectadaptor import ProjectAdaptor from igf_data.igfdb.platformadaptor import PlatformAdaptor from igf_data.igfdb.seqrunadaptor import SeqrunAdaptor from igf_data.igfdb.sampleadaptor import SampleAdaptor -from igf_data.igfdb.useradaptor import UserAdaptor from igf_data.igfdb.experimentadaptor import ExperimentAdaptor from igf_data.igfdb.runadaptor import RunAdaptor from igf_data.igfdb.collectionadaptor import CollectionAdaptor @@ -22,7 +15,6 @@ from igf_data.utils.dbutils import read_dbconf_json from igf_data.utils.fileutils import ( get_temp_dir, - check_file_path, remove_dir) from igf_airflow.utils.dag26_snakemake_rnaseq_utils import ( parse_analysis_design_and_get_metadata) @@ -286,7 +278,6 @@ def test_prepare_cellranger_run_dir_and_script_file(self): with open(library_csv_file, 'r') as fp: for i in fp: if i.startswith('['): - data_list = list() ge_start = False lib_start = False if ge_start: @@ -341,7 +332,6 @@ def test_prepare_cellranger_script(self): sample_group='grp1', design_dict=design_dict) script_file = output_dict.get("run_script") - output_dir = output_dict.get("output_dir") sample_group = output_dict.get("sample_group") self.assertTrue(os.path.exists(script_file)) with open(script_file, 'r') as fp: diff --git a/test/igf_airflow/test_dag44_analysis_registration_utils.py b/test/igf_airflow/test_dag44_analysis_registration_utils.py index bde43ce20..64c5d9a4e 100644 --- a/test/igf_airflow/test_dag44_analysis_registration_utils.py +++ b/test/igf_airflow/test_dag44_analysis_registration_utils.py @@ -81,16 +81,15 @@ def test_find_raw_metadata_id(self, mock_get_context): mock_context.get.return_value = mock_context.dag_run mock_context.dag_run.conf.get.return_value = 1 mock_get_context.return_value = mock_context - raw_analysis_info = find_raw_metadata_id.function() - assert "raw_analysis_id" in raw_analysis_info - assert raw_analysis_info.get("raw_analysis_id") == 1 + raw_analysis_id = find_raw_metadata_id.function() + assert raw_analysis_id == 1 @patch("igf_airflow.utils.dag44_analysis_registration_utils.get_data_from_portal", return_value={'project_id': 1, 'pipeline_id': 2, 'analysis_name': 'a', 'analysis_yaml': 'b:'}) def test_fetch_raw_metadata_from_portal(self, *args): - raw_metadata_info = \ + raw_metadata_file = \ fetch_raw_metadata_from_portal.function(raw_analysis_id=1) - assert "raw_metadata_file" in raw_metadata_info + assert os.path.exists(raw_metadata_file) def test_check_registered_analysis_in_db(self): @@ -125,16 +124,14 @@ def test_check_raw_metadata_in_db(self, *args): 'analysis_name': 'analysis_2', 'pipeline_id': 1, 'analysis_yaml': 'b:'}, fp) - valid_raw_metadata_info = \ + valid_raw_metadata_file = \ check_raw_metadata_in_db.function( raw_metadata_file=json_file_1) - assert "valid_raw_metadata_file" in valid_raw_metadata_info - assert valid_raw_metadata_info.get("valid_raw_metadata_file") == "" - valid_raw_metadata_info = \ + assert valid_raw_metadata_file == "" + valid_raw_metadata_file = \ check_raw_metadata_in_db.function( raw_metadata_file=json_file_2) - assert "valid_raw_metadata_file" in valid_raw_metadata_info - assert valid_raw_metadata_info.get("valid_raw_metadata_file") == json_file_2 + assert valid_raw_metadata_file == json_file_2 def test_register_analysis_in_db(self): status = \ @@ -159,9 +156,18 @@ def test_register_analysis_in_db(self): analysis_id = \ aa.check_analysis_record_by_analysis_name_and_project_id( analysis_name='analysis_2', - project_id=2) - aa.close_session() + project_id=2, + output_mode='one_or_none') assert analysis_id is not None + pl = PipelineAdaptor(**{'session': aa.session}) + pipe_seed = \ + pl.fetch_pipeline_seed( + pipeline_id=1, + seed_id=analysis_id[0], + seed_table='analysis', + output_mode='one_or_none') + assert pipe_seed is not None + aa.close_session() with pytest.raises(Exception): status = \ register_analysis_in_db( @@ -190,16 +196,14 @@ def test_register_raw_analysis_metadata_in_db(self, *args): 'analysis_name': 'analysis_2', 'pipeline_id': 1, 'analysis_yaml': 'b:'}, fp) - status_info = \ + status = \ register_raw_analysis_metadata_in_db.function( valid_raw_metadata_file=json_file_1) - assert "status" in status_info - assert status_info.get("status") is False - status_info = \ + assert status is False + status = \ register_raw_analysis_metadata_in_db.function( valid_raw_metadata_file=json_file_2) - assert "status" in status_info - assert status_info.get("status") is True + assert status is True aa = \ AnalysisAdaptor(**{'session_class': self.session_class}) aa.start_session() diff --git a/test/igf_airflow/test_dag46_scRNA_10X_flex_utils.py b/test/igf_airflow/test_dag46_scRNA_10X_flex_utils.py new file mode 100644 index 000000000..2ab80e3bd --- /dev/null +++ b/test/igf_airflow/test_dag46_scRNA_10X_flex_utils.py @@ -0,0 +1,262 @@ +import os +import unittest +import pytest +from unittest.mock import patch +from igf_data.utils.fileutils import ( + get_temp_dir, + remove_dir) +from yaml import load, SafeLoader +from igf_airflow.utils.dag46_scRNA_10X_flex_utils import ( + _get_cellranger_sample_group, + prepare_cellranger_flex_script) +from igf_data.igfdb.igfTables import Base +from igf_data.igfdb.baseadaptor import BaseAdaptor +from igf_data.igfdb.projectadaptor import ProjectAdaptor +from igf_data.igfdb.platformadaptor import PlatformAdaptor +from igf_data.igfdb.seqrunadaptor import SeqrunAdaptor +from igf_data.igfdb.sampleadaptor import SampleAdaptor +from igf_data.igfdb.experimentadaptor import ExperimentAdaptor +from igf_data.igfdb.runadaptor import RunAdaptor +from igf_data.igfdb.collectionadaptor import CollectionAdaptor +from igf_data.igfdb.fileadaptor import FileAdaptor +from igf_data.utils.dbutils import read_dbconf_json + +DESIGN_YAML = """sample_metadata: + IGFsampleA: + feature_types: Gene Expression + cellranger_group: grp1 + IGFsampleB: + feature_types: Antibody Capture + cellranger_group: grp1 +analysis_metadata: + cellranger_multi_config: + - "[gene-expression]" + - "reference,/REF" + - "r1-length,28" + - "r2-length,90" + - "chemistry,auto" + - "expect-cells,60000" + - "force-cells,6000" + - "include-introns,true" + - "no-secondary,false" + - "no-bam,false" + - "check-library-compatibility,true" + - "min-assignment-confidence,0.9" + - "cmo-set,/path/custom/cmo.csv" + - "[vdj]" + - "reference,/path" + - "r1-length,28" + - "r2-length,90" + - "[samples]" + - "sample_id,cmo_ids" + - "IGF3,CMO3" + scanpy_config: + active: true + mito_prefix: MT- + run_scrublet: true + run_cellcycle_score: true + cell_marker_list: /path/PangaloDB + cell_marker_species: HG38 + s_genes: '' + g2m_genes: '' + cell_marker_mode: NON-VDJ + scvelo: + active: true""" + +class Test_dag46_scRNA_10X_flex_utilsA(unittest.TestCase): + def setUp(self): + self.temp_dir = get_temp_dir() + self.design_yaml = DESIGN_YAML + self.yaml_file = \ + os.path.join( + self.temp_dir, + 'analysis_design.yaml') + with open(self.yaml_file, 'w') as fp: + fp.write(self.design_yaml) + self.dbconfig = 'data/dbconfig.json' + dbparam = read_dbconf_json(self.dbconfig) + base = BaseAdaptor(**dbparam) + self.engine = base.engine + self.dbname = dbparam['dbname'] + Base.metadata.create_all(self.engine) + self.session_class = base.get_session_class() + base.start_session() + platform_data = [{ + "platform_igf_id" : "M03291", + "model_name" : "MISEQ", + "vendor_name" : "ILLUMINA", + "software_name" : "RTA", + "software_version" : "RTA1.18.54"}] + flowcell_rule_data = [{ + "platform_igf_id": "M03291", + "flowcell_type": "MISEQ", + "index_1": "NO_CHANGE", + "index_2": "NO_CHANGE"}] + pl = PlatformAdaptor(**{'session':base.session}) + pl.store_platform_data(data=platform_data) + pl.store_flowcell_barcode_rule(data=flowcell_rule_data) + seqrun_data = [{ + 'seqrun_igf_id': '180416_M03291_0139_000000000-BRN47', + 'flowcell_id': '000000000-BRN47', + 'platform_igf_id': 'M03291', + 'flowcell': 'MISEQ'}] + sra = SeqrunAdaptor(**{'session':base.session}) + sra.store_seqrun_and_attribute_data(data=seqrun_data) + project_data = [{'project_igf_id':'IGFQprojectA'}] + pa = ProjectAdaptor(**{'session':base.session}) + pa.store_project_and_attribute_data(data=project_data) + sample_data = [{ + 'sample_igf_id': 'IGFsampleA', + 'project_igf_id': 'IGFQprojectA', + 'species_name': 'HG38' + },{ + 'sample_igf_id': 'IGFsampleB', + 'project_igf_id': 'IGFQprojectA', + 'species_name': 'UNKNOWN' + }] + sa = SampleAdaptor(**{'session':base.session}) + sa.store_sample_and_attribute_data(data=sample_data) + experiment_data = [{ + 'project_igf_id': 'IGFQprojectA', + 'sample_igf_id': 'IGFsampleA', + 'experiment_igf_id': 'IGFsampleA_MISEQ', + 'library_name': 'IGFsampleA', + 'library_source': 'TRANSCRIPTOMIC', + 'library_strategy': 'RNA-SEQ', + 'experiment_type': 'POLYA-RNA', + 'library_layout': 'PAIRED', + 'platform_name': 'MISEQ', + },{ + 'project_igf_id': 'IGFQprojectA', + 'sample_igf_id': 'IGFsampleB', + 'experiment_igf_id': 'IGFsampleB_MISEQ', + 'library_name': 'IGFsampleB', + 'library_source': 'UNKNOWN', + 'library_strategy': 'UNKNOWN', + 'experiment_type': 'UNKNOWN', + 'library_layout': 'UNKNOWN', + 'platform_name': 'MISEQ', + }] + ea = ExperimentAdaptor(**{'session':base.session}) + ea.store_project_and_attribute_data(data=experiment_data) + run_data = [{ + 'experiment_igf_id': 'IGFsampleA_MISEQ', + 'seqrun_igf_id': '180416_M03291_0139_000000000-BRN47', + 'run_igf_id': 'IGFsampleA_MISEQ_000000000-BRN47_1', + 'lane_number': '1' + },{ + 'experiment_igf_id': 'IGFsampleB_MISEQ', + 'seqrun_igf_id': '180416_M03291_0139_000000000-BRN47', + 'run_igf_id': 'IGFsampleB_MISEQ_000000000-BRN47_1', + 'lane_number': '1' + }] + ra = RunAdaptor(**{'session':base.session}) + ra.store_run_and_attribute_data(data=run_data) + file_data = [ + {'file_path': '/path/IGFSampleA/IGFsampleA_S1_L001_R1_001.fastq.gz'}, + {'file_path': '/path/IGFSampleA/IGFsampleA_S1_L001_R2_001.fastq.gz'}, + {'file_path': '/path/IGFSampleA/IGFsampleA_S1_L001_I1_001.fastq.gz'}, + {'file_path': '/path/IGFSampleA/IGFsampleA_S1_L001_I2_001.fastq.gz'}, + {'file_path': '/path/IGFSampleB/IGFsampleB_S2_L001_R1_001.fastq.gz'}, + {'file_path': '/path/IGFSampleB/IGFsampleA_S1_L001_R2_001.fastq.gz'}, + {'file_path': '/path/IGFSampleB/IGFsampleB_S2_L001_I1_001.fastq.gz'}, + {'file_path': '/path/IGFSampleB/IGFsampleB_S2_L001_I2_001.fastq.gz'}] + fa = FileAdaptor(**{'session':base.session}) + fa.store_file_and_attribute_data(data=file_data) + collection_data = [{ + 'name': 'IGFsampleA_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'table': 'run' + }, { + 'name': 'IGFsampleB_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'table': 'run' + }] + collection_files_data = [{ + 'name': 'IGFsampleA_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'file_path': '/path/IGFSampleA/IGFsampleA_S1_L001_R1_001.fastq.gz' + }, { + 'name': 'IGFsampleA_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'file_path': '/path/IGFSampleA/IGFsampleA_S1_L001_R2_001.fastq.gz' + }, { + 'name': 'IGFsampleA_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'file_path': '/path/IGFSampleA/IGFsampleA_S1_L001_I1_001.fastq.gz' + }, { + 'name': 'IGFsampleA_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'file_path': '/path/IGFSampleA/IGFsampleA_S1_L001_I2_001.fastq.gz' + }, { + 'name': 'IGFsampleB_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'file_path': '/path/IGFSampleB/IGFsampleB_S2_L001_R1_001.fastq.gz' + }, { + 'name': 'IGFsampleB_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'file_path': '/path/IGFSampleB/IGFsampleA_S1_L001_R2_001.fastq.gz' + }, { + 'name': 'IGFsampleB_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'file_path': '/path/IGFSampleB/IGFsampleB_S2_L001_I1_001.fastq.gz' + }, { + 'name': 'IGFsampleB_MISEQ_000000000-BRN47_1', + 'type': 'demultiplexed_fastq', + 'file_path': '/path/IGFSampleB/IGFsampleB_S2_L001_I2_001.fastq.gz' + }] + ca = CollectionAdaptor(**{'session':base.session}) + ca.store_collection_and_attribute_data(data=collection_data) + ca.create_collection_group(data=collection_files_data) + base.close_session() + + def tearDown(self): + Base.metadata.drop_all(self.engine) + if os.path.exists(self.dbname): + os.remove(self.dbname) + remove_dir(self.temp_dir) + + + def test_get_cellranger_sample_group(self): + design_json = load(self.design_yaml, Loader=SafeLoader) + sample_metadata = design_json.get("sample_metadata") + sample_groups = \ + _get_cellranger_sample_group( + sample_metadata=sample_metadata) + assert len(sample_groups) == 1 + sample_metadata.update({"IGFsampleA": { + "feature_types": "Gene Expression", + "cellranger_group": "grp2"} + }) + assert "grp1" in sample_groups + with pytest.raises(Exception): + sample_groups = \ + _get_cellranger_sample_group( + sample_metadata=sample_metadata) + + def test_prepare_cellranger_flex_script(self): + design_dict = { + "analysis_design": self.yaml_file} + work_dir = \ + get_temp_dir( + self.temp_dir, + prefix="work") + with patch("igf_airflow.utils.dag46_scRNA_10X_flex_utils.DATABASE_CONFIG_FILE", + self.dbconfig): + with patch("igf_airflow.utils.dag46_scRNA_10X_flex_utils.CELLRANGER_MULTI_SCRIPT_TEMPLATE", + 'template/cellranger_template/cellranger_multi_run_script_v1.sh'): + output_dict = \ + prepare_cellranger_flex_script.\ + function( + work_dir=work_dir, + design_dict=design_dict) + script_file = output_dict.get("run_script") + sample_group = output_dict.get("sample_group") + assert os.path.exists(script_file) + with open(script_file, 'r') as fp: + data = fp.read() + assert sample_group == "grp1" + assert "--id=grp1" in data + +if __name__=='__main__': + unittest.main() \ No newline at end of file