From 898583b29d91fba905f9ccb6366f0d7bbaad7419 Mon Sep 17 00:00:00 2001 From: Sebastian Benjamin Date: Wed, 29 Oct 2025 11:36:23 -0700 Subject: [PATCH] Make fastq-to-bam resolve umis within cbs rather than globally --- nimble/__main__.py | 6 +- nimble/fastq_barcode_processor.py | 157 ++++++++++++++++++++---------- 2 files changed, 110 insertions(+), 53 deletions(-) diff --git a/nimble/__main__.py b/nimble/__main__.py index c61d163..2b59212 100755 --- a/nimble/__main__.py +++ b/nimble/__main__.py @@ -424,8 +424,7 @@ def sort_input_bam(bam, cores, tmp_dir): fastq_to_bam_parser = subparsers.add_parser('fastq-to-bam') fastq_to_bam_parser.add_argument('--r1-fastq', help='Path to R1 FASTQ file.', type=str, required=True) fastq_to_bam_parser.add_argument('--r2-fastq', help='Path to R2 FASTQ file.', type=str, required=True) - fastq_to_bam_parser.add_argument('--cell-barcodes', help='Path to cell barcode mapping TSV file.', type=str, required=True) - fastq_to_bam_parser.add_argument('--umi-mapping', help='Path to UMI mapping TSV file.', type=str, required=True) + fastq_to_bam_parser.add_argument("--map", required=True, help="TSV(.gz) with columns: rawCB, correctedCB, rawUMI, correctedUMI") fastq_to_bam_parser.add_argument('--output', help='Path for output BAM file.', type=str, required=True) fastq_to_bam_parser.add_argument('-c', '--num_cores', help='The number of cores to use for processing.', type=int, default=1) @@ -444,8 +443,7 @@ def sort_input_bam(bam, cores, tmp_dir): fastq_to_bam_with_barcodes( args.r1_fastq, args.r2_fastq, - args.cell_barcodes, - args.umi_mapping, + args.map, args.output, args.num_cores ) diff --git a/nimble/fastq_barcode_processor.py b/nimble/fastq_barcode_processor.py index 65ca47d..8b77785 100644 --- a/nimble/fastq_barcode_processor.py +++ b/nimble/fastq_barcode_processor.py @@ -6,94 +6,131 @@ import pysam from Bio import SeqIO -def load_barcode_mapping(tsv_path): +def load_cb_umi_mapping(tsv_path): """ - Load a TSV file with Raw->Corrected barcode mappings into a dictionary. - Handles gzipped files automatically. + Load a TSV with 4 columns: + 1) Raw CB + 2) Corrected CB + 3) Raw UMI + 4) Corrected UMI + + Returns: + cb_map: dict[str raw_cb] = str corrected_cb + umi_map_by_raw_cb: dict[str raw_cb] = dict[str raw_umi] = str corrected_umi """ - barcode_map = {} + cb_map = {} + umi_map_by_raw_cb = defaultdict(dict) open_func = gzip.open if tsv_path.endswith('.gz') else open mode = 'rt' if tsv_path.endswith('.gz') else 'r' + line_ct = 0 + malformed = 0 + pairs_ct = 0 + unique_cb = set() + try: with open_func(tsv_path, mode) as f: - header = f.readline().strip().split('\t') - if len(header) != 2: - raise ValueError(f"Expected 2 columns in TSV header, got {len(header)}") - - for line_num, line in enumerate(f, start=2): + for line_num, line in enumerate(f, start=1): line = line.strip() if not line: continue parts = line.split('\t') - if len(parts) != 2: - print(f"Warning: Skipping malformed line {line_num} in {tsv_path}: {line}") + if len(parts) < 4: + malformed += 1 continue - raw_barcode, corrected_barcode = parts - barcode_map[raw_barcode] = corrected_barcode + raw_cb, corr_cb, raw_umi, corr_umi = parts[0], parts[1], parts[2], parts[3] + cb_map[raw_cb] = corr_cb + umi_map_by_raw_cb[raw_cb][raw_umi] = corr_umi + pairs_ct += 1 + unique_cb.add(raw_cb) + line_ct += 1 except Exception as e: - print(f"Error loading barcode mapping from {tsv_path}: {e}") + print(f"Error loading CB/UMI mapping from {tsv_path}: {e}", file=sys.stderr) sys.exit(1) - print(f"Loaded {len(barcode_map)} barcode mappings from {tsv_path}") - return barcode_map + print(f"Loaded mapping from {tsv_path}") + print(f" Lines read: {line_ct + malformed}") + print(f" Malformed lines skipped (<4 cols): {malformed}") + print(f" Unique raw CBs: {len(unique_cb)}") + print(f" (raw CB, raw UMI) pairs: {pairs_ct}") + return cb_map, umi_map_by_raw_cb def parse_10x_barcode_from_r1(sequence, cb_length=16, umi_length=12): """ Parse cell barcode and UMI from 10x R1 read sequence. + Returns (cell_barcode, umi, remaining_sequence) """ if len(sequence) < cb_length + umi_length: return None, None, "" - cell_barcode = sequence[:cb_length] umi = sequence[cb_length:cb_length + umi_length] remaining_sequence = sequence[cb_length + umi_length:] - return cell_barcode, umi, remaining_sequence -def process_pair(r1_record, r2_record, cb_map, umi_map): +def process_pair(r1_record, r2_record, cb_map, umi_map_by_raw_cb, stats, cb_length=16, umi_length=12): """ Process a single FASTQ pair and return (r1_bam, r2_bam), or None if skipped. + Mapping logic: + - Look up corrected CB by raw CB. + - Look up corrected UMI by (raw CB, raw UMI). + Both must exist to emit a pair. """ + # Normalize names (handle /1 and /2 suffixes if present) r1_name = r1_record.id.removesuffix('/1') r2_name = r2_record.id.removesuffix('/2') if r1_name != r2_name: + stats['name_mismatch'] += 1 return None r1_seq = str(r1_record.seq) - cell_barcode, umi, remaining_r1_seq = parse_10x_barcode_from_r1(r1_seq) - if cell_barcode is None or umi is None or len(remaining_r1_seq) == 0: + cell_barcode, umi, remaining_r1_seq = parse_10x_barcode_from_r1(r1_seq, cb_length, umi_length) + if cell_barcode is None or umi is None: + stats['too_short'] += 1 + return None + if len(remaining_r1_seq) == 0: + stats['no_remaining_seq'] += 1 return None + # CB and UMI corrections (UMI in context of *raw* CB) corrected_cb = cb_map.get(cell_barcode) - corrected_umi = umi_map.get(umi) - if corrected_cb is None or corrected_umi is None: + if corrected_cb is None: + stats['cb_not_in_map'] += 1 return None - barcode_length = len(cell_barcode) + len(umi) + umi_map_for_cb = umi_map_by_raw_cb.get(cell_barcode) + if not umi_map_for_cb: + stats['cb_has_no_umi_map'] += 1 + return None + + corrected_umi = umi_map_for_cb.get(umi) + if corrected_umi is None: + stats['umi_not_in_map_for_cb'] += 1 + return None - # R1 BAM + barcode_length = cb_length + umi_length + + # Build unaligned BAM records r1_bam = pysam.AlignedSegment() r1_bam.query_name = r1_name r1_bam.query_sequence = remaining_r1_seq + # Biopython stores qualities as list[int]; slice past CB+UMI r1_bam.query_qualities = r1_record.letter_annotations["phred_quality"][barcode_length:] - r1_bam.flag = 77 + r1_bam.flag = 77 # 0x004D: paired, first in pair, unmapped, mate unmapped r1_bam.reference_id = -1 r1_bam.reference_start = -1 r1_bam.mapping_quality = 0 r1_bam.set_tag("CB", corrected_cb) r1_bam.set_tag("UB", corrected_umi) - # R2 BAM r2_bam = pysam.AlignedSegment() r2_bam.query_name = r2_name r2_bam.query_sequence = str(r2_record.seq) r2_bam.query_qualities = r2_record.letter_annotations["phred_quality"] - r2_bam.flag = 141 + r2_bam.flag = 141 # 0x008D: paired, second in pair, unmapped, mate unmapped r2_bam.reference_id = -1 r2_bam.reference_start = -1 r2_bam.mapping_quality = 0 @@ -103,13 +140,20 @@ def process_pair(r1_record, r2_record, cb_map, umi_map): return r1_bam, r2_bam -def fastq_to_bam_with_barcodes(r1_fastq, r2_fastq, cb_mapping_file, umi_mapping_file, output_bam, num_cores=1): +def fastq_to_bam_with_barcodes(r1_fastq, r2_fastq, cb_umi_mapping_file, output_bam, num_cores=1, cb_length=16, umi_length=12): """ Convert paired FASTQ files to unaligned BAM with CB/UB tags using multiple threads. + + Args: + r1_fastq: path to R1 FASTQ(.gz) + r2_fastq: path to R2 FASTQ(.gz) + cb_umi_mapping_file: TSV(.gz) with 4 columns (rawCB, corrCB, rawUMI, corrUMI) + output_bam: path to output BAM + num_cores: threads + cb_length, umi_length: lengths for parsing R1 """ - print("Loading barcode mappings...") - cb_map = load_barcode_mapping(cb_mapping_file) - umi_map = load_barcode_mapping(umi_mapping_file) + print("Loading CB/UMI mapping...") + cb_map, umi_map_by_raw_cb = load_cb_umi_mapping(cb_umi_mapping_file) stats = defaultdict(int) @@ -119,8 +163,8 @@ def fastq_to_bam_with_barcodes(r1_fastq, r2_fastq, cb_mapping_file, umi_mapping_ r2_mode = 'rt' if r2_fastq.endswith('.gz') else 'r' header = { - 'HD': {'VN': '1.4', 'SO': 'queryname'}, - 'PG': [{'ID': 'nimble-fastq-to-bam', 'PN': 'nimble', 'VN': '1.0'}] + 'HD': {'VN': '1.6', 'SO': 'queryname'}, + 'PG': [{'ID': 'nimble-fastq-to-bam', 'PN': 'nimble', 'VN': '1.1', 'CL': 'single-tsv, cb-scoped-umi'}] } print(f"Processing paired FASTQ files with {num_cores} threads...") @@ -137,39 +181,54 @@ def fastq_to_bam_with_barcodes(r1_fastq, r2_fastq, cb_mapping_file, umi_mapping_ futures = {} for idx, (r1_record, r2_record) in enumerate(zip(r1_iter, r2_iter), start=1): - stats['total_reads'] += 1 - fut = executor.submit(process_pair, r1_record, r2_record, cb_map, umi_map) - futures[fut] = idx + stats['total_pairs'] += 1 + fut = executor.submit(process_pair, r1_record, r2_record, + cb_map, umi_map_by_raw_cb, stats, cb_length, umi_length) + futures[fut] = True # Throttle in-flight futures to limit memory use if len(futures) >= num_cores * 100: - for done in as_completed(list(futures)[:num_cores]): - result = done.result() - if result: - r1_bam, r2_bam = result + done_any = 0 + for done in as_completed(list(futures)[:num_cores * 10]): + res = done.result() + if res: + r1_bam, r2_bam = res bam_out.write(r1_bam) bam_out.write(r2_bam) stats['written_pairs'] += 1 del futures[done] + done_any += 1 + if done_any >= num_cores * 10: + break - if stats['total_reads'] % 1000000 == 0: - print(f"Processed {stats['total_reads']} reads...") + if stats['total_pairs'] % 1_000_000 == 0: + print(f"Processed {stats['total_pairs']} read pairs...") # Drain remaining futures - for done in as_completed(futures): - result = done.result() - if result: - r1_bam, r2_bam = result + for done in as_completed(list(futures.keys())): + res = done.result() + if res: + r1_bam, r2_bam = res bam_out.write(r1_bam) bam_out.write(r2_bam) stats['written_pairs'] += 1 + del futures[done] except Exception as e: - print(f"Error during processing: {e}") + print(f"Error during processing: {e}", file=sys.stderr) sys.exit(1) # Print summary print("\n=== Processing Statistics ===") + ordered = [ + 'total_pairs', 'written_pairs', + 'name_mismatch', 'too_short', 'no_remaining_seq', + 'cb_not_in_map', 'cb_has_no_umi_map', 'umi_not_in_map_for_cb' + ] + for k in ordered: + print(f"{k.replace('_', ' ').capitalize()}: {stats.get(k, 0)}") + # Print any other counters encountered for k, v in stats.items(): - print(f"{k.replace('_', ' ').capitalize()}: {v}") + if k not in ordered: + print(f"{k.replace('_', ' ').capitalize()}: {v}") print(f"Output BAM written to: {output_bam}") \ No newline at end of file