Skip to content
199 changes: 48 additions & 151 deletions igf_airflow/utils/dag34_cellranger_multi_scRNA_utils.py

Large diffs are not rendered by default.

60 changes: 35 additions & 25 deletions igf_airflow/utils/dag44_analysis_registration_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@
task_id="find_raw_metadata_id",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G',
multiple_outputs=False)
queue='hpc_4G')
def find_raw_metadata_id(
raw_analysis_id_tag: str = "raw_analysis_id",
dag_run_key: str = "dag_run") \
-> Dict[str, int]:
-> int:
try:
### dag_run.conf should have raw_analysis_id
context = get_current_context()
Expand All @@ -61,7 +60,7 @@ def find_raw_metadata_id(
if raw_analysis_id is None:
raise ValueError(
'raw_analysis_id not found in dag_run.conf')
return {raw_analysis_id_tag: raw_analysis_id}
return int(raw_analysis_id)
except Exception as e:
message = \
f"Failed to get raw_analysis_id, error: {e}"
Expand All @@ -74,12 +73,10 @@ def find_raw_metadata_id(
## TASK - fetch raw analysis metadata from portal
@task(
task_id="fetch_raw_metadata_from_portal",
retries=0,
queue='hpc_4G',
multiple_outputs=False)
retries=1,
queue='hpc_4G')
def fetch_raw_metadata_from_portal(
raw_analysis_id: int,
raw_metadata_file_tag: str = "raw_metadata_file") -> Dict[str, str]:
raw_analysis_id: int) -> str:
try:
raw_analysis_data = \
get_data_from_portal(
Expand All @@ -101,7 +98,7 @@ def fetch_raw_metadata_from_portal(
os.path.join(temp_dir, "raw_metadata.json")
with open(raw_metadata_json_file, "w") as fp:
json.dump(raw_analysis_data, fp)
return {raw_metadata_file_tag: raw_metadata_json_file}
return raw_metadata_json_file
except Exception as e:
message = \
f"Failed to fetch raw_analysis_metadata, error: {e}"
Expand Down Expand Up @@ -141,12 +138,10 @@ def check_registered_analysis_in_db(
task_id="check_raw_metadata_in_db",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G',
multiple_outputs=False)
queue='hpc_4G')
def check_raw_metadata_in_db(
raw_metadata_file: str,
valid_raw_metadata_file_tag: str = "valid_raw_metadata_file") \
-> Dict[str, str]:
raw_metadata_file: str) \
-> str:
try:
check_file_path(raw_metadata_file)
with open(raw_metadata_file, "r") as fp:
Expand All @@ -166,10 +161,9 @@ def check_raw_metadata_in_db(
project_id=project_id,
analysis_name=analysis_name,
dbconf_json=DATABASE_CONFIG_FILE)
if analysis_reg:
return {valid_raw_metadata_file_tag: raw_metadata_file}
else:
return {valid_raw_metadata_file_tag: ""}
if not analysis_reg:
raw_metadata_file = ""
return raw_metadata_file
except Exception as e:
message = \
f"Failed to check existing raw_analysis_metadata, error: {e}"
Expand Down Expand Up @@ -242,6 +236,21 @@ def register_analysis_in_db(
"analysis_name": analysis_name,
"analysis_description": analysis_json}]
aa.store_analysis_data(data)
## fetch analysis id
analysis_id = \
aa.check_analysis_record_by_analysis_name_and_project_id(
analysis_name=analysis_name,
project_id=project_id,
output_mode='one_or_none')
if analysis_id is None:
raise ValueError(
f"No analysis id found for {analysis_name}")
## create pipeline seed status for analysis id and pipeline id
data = [{
'pipeline_id': pipeline_id,
'seed_id': analysis_id[0],
'seed_table': 'analysis'}]
pl.create_pipeline_seed(data)
aa.close_session()
return True
except Exception as e:
Expand All @@ -256,10 +265,10 @@ def register_analysis_in_db(
retries=4,
queue='hpc_4G',
multiple_outputs=False)
def register_raw_analysis_metadata_in_db(valid_raw_metadata_file):
def register_raw_analysis_metadata_in_db(valid_raw_metadata_file: str) -> bool:
try:
if valid_raw_metadata_file == "":
return {"status": False}
return False
else:
check_file_path(valid_raw_metadata_file)
with open(valid_raw_metadata_file, "r") as fp:
Expand All @@ -281,7 +290,7 @@ def register_raw_analysis_metadata_in_db(valid_raw_metadata_file):
analysis_name=analysis_name,
analysis_yaml=analysis_yaml,
dbconf_json=DATABASE_CONFIG_FILE)
return {"status": status}
return status
except Exception as e:
message = \
f"Failed to register raw_analysis_metadata, error: {e}"
Expand All @@ -296,9 +305,10 @@ def register_raw_analysis_metadata_in_db(valid_raw_metadata_file):
task_id="mark_metadata_synced_on_portal",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G',
multiple_outputs=False)
def mark_metadata_synced_on_portal(raw_analysis_id, registration_status):
queue='hpc_4G')
def mark_metadata_synced_on_portal(
raw_analysis_id: int,
registration_status: bool) -> None:
try:
if registration_status:
_ = \
Expand Down
136 changes: 136 additions & 0 deletions igf_airflow/utils/dag46_scRNA_10X_flex_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import os
import logging
import pandas as pd
from typing import Any
from datetime import timedelta
from airflow.models import Variable
from airflow.decorators import task
from igf_airflow.utils.generic_airflow_utils import (
send_airflow_failed_logs_to_channels)
from igf_data.utils.fileutils import (
check_file_path)
from igf_airflow.utils.dag34_cellranger_multi_scRNA_utils import (
prepare_cellranger_run_dir_and_script_file,
parse_analysis_design_and_get_metadata)

log = logging.getLogger(__name__)

MS_TEAMS_CONF = \
Variable.get('analysis_ms_teams_conf', default_var=None)
DATABASE_CONFIG_FILE = \
Variable.get('database_config_file', default_var=None)
HPC_BASE_RAW_DATA_PATH = \
Variable.get('hpc_base_raw_data_path', default_var=None)
HPC_FILE_LOCATION = \
Variable.get("hpc_file_location", default_var="HPC_PROJECT")

## CELLRANGER
CELLRANGER_MULTI_SCRIPT_TEMPLATE = \
Variable.get("cellranger_multi_script_template", default_var=None)

def _get_cellranger_sample_group(
sample_metadata: dict[str, Any],
cellranger_group_name: str = 'cellranger_group',
required_tag_name: str = 'feature_types',
required_tag_value: str = 'Gene Expression') \
-> dict[str]:
try:
unique_sample_groups = set()
required_tag_list = list()
for _, group in sample_metadata.items():
grp_name = group.get(cellranger_group_name)
if grp_name is None:
raise KeyError(
"Missing cellranger_group in sample_metadata")
unique_sample_groups.add(grp_name)
required_tag_list.append({
"name": grp_name,
required_tag_name: group.get(required_tag_name)})
if len(unique_sample_groups) == 0:
raise ValueError("No sample group found")
unique_sample_groups = \
list(unique_sample_groups)
required_tag_df = \
pd.DataFrame(required_tag_list)
## check for required tags
for g in unique_sample_groups:
g_tag_values = (
required_tag_df[required_tag_df['name']==g][required_tag_name]
.values
.tolist()
)
if required_tag_value not in g_tag_values:
raise KeyError(
f"No {required_tag_value} found for group {g}")
return unique_sample_groups
except Exception as e:
raise ValueError(
f'Failed to get cellranger sample group, error: {e}')


## TASK
@task(
task_id="prepare_cellranger_flex_script",
retry_delay=timedelta(minutes=5),
retries=4,
queue='hpc_4G')
def prepare_cellranger_flex_script(
design_dict: dict,
work_dir: str,
analysis_design_tag: str = "analysis_design",
cellranger_group_name: str = 'cellranger_group',
required_tag_name: str = 'feature_types',
required_tag_value: str = 'Gene Expression') -> dict:

"""
Create cellranger flex script
"""
try:
## get sample metadata
design_file = \
design_dict.get(analysis_design_tag)
check_file_path(design_file)
with open(design_file, 'r') as fp:
input_design_yaml = fp.read()
sample_metadata, analysis_metadata = \
parse_analysis_design_and_get_metadata(
input_design_yaml=input_design_yaml)
if sample_metadata is None or \
analysis_metadata is None:
raise KeyError(
"Missing sample or analysis metadata")
## check if only one sample group is present or not
sample_groups = \
_get_cellranger_sample_group(
sample_metadata=sample_metadata,
required_tag_name=required_tag_name,
required_tag_value=required_tag_value)
## check if correct number of sample groups are present
## reset sample group if more than one groups are present
if len(sample_groups) == 0:
raise ValueError(
"No sample group has been found, file: " + \
f"{design_file}")
sample_group = \
sample_groups[0]
_, run_script_file = \
prepare_cellranger_run_dir_and_script_file(
sample_group=str(sample_group),
work_dir=work_dir,
output_dir=os.path.join(work_dir, str(sample_group)),
design_file=design_file,
db_config_file=DATABASE_CONFIG_FILE,
run_script_template=CELLRANGER_MULTI_SCRIPT_TEMPLATE,
cellranger_group_tag=cellranger_group_name,
feature_types_tag=required_tag_name)
analysis_script_info = {
"sample_group": sample_group,
"run_script": run_script_file,
"output_dir": os.path.join(work_dir, sample_group)}
return analysis_script_info
except Exception as e:
log.error(e)
send_airflow_failed_logs_to_channels(
ms_teams_conf=MS_TEAMS_CONF,
message_prefix=e)
raise ValueError(e)
6 changes: 4 additions & 2 deletions igf_airflow/utils/generic_airflow_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ def mark_analysis_failed(
queue='hpc_4G')
def send_email_to_user(
send_email: bool = True,
email_user_key: str = 'username') -> None:
email_user_key: str = 'username',
analysis_email_template: str = ANALYSES_EMAIL_CONFIG) -> None:
"""
An Airflow task for sending email to registered users for updating analysis pipeline status

Parameters:
send_email (bool): A toggle for sending email to primary user if "True" (default) or fall back to default user if "False"
email_user_key (str): Key for the default user as mentioned in the email config file, default is 'username'
analysis_email_template (str): A template for email, default is ANALYSES_EMAIL_CONFIG
"""
try:
## dag_run.conf should have analysis_id
Expand Down Expand Up @@ -254,7 +256,7 @@ def send_email_to_user(
email_text_file, receivers = \
generate_email_text_for_analysis(
analysis_id=analysis_id,
template_path=ANALYSES_EMAIL_CONFIG,
template_path=analysis_email_template,
dbconfig_file=DATABASE_CONFIG_FILE,
default_email_user=default_email_user,
send_email_to_user=send_email)
Expand Down
4 changes: 3 additions & 1 deletion template/geomx/geomx_ngs_pipeline.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/bin/bash

set -eo pipefail
eval "$(~/anaconda3/bin/conda shell.bash hook)"
source activate expect

cd {{ WORK_DIR }}

Expand All @@ -17,4 +19,4 @@ script=$(cat << EOF
expect "*All done*"
EOF
)
/usr/bin/expect -c "$script"
expect -c "$script"
Loading