Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions nimble/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ def sort_input_bam(bam, cores, tmp_dir):
fastq_to_bam_parser = subparsers.add_parser('fastq-to-bam')
fastq_to_bam_parser.add_argument('--r1-fastq', help='Path to R1 FASTQ file.', type=str, required=True)
fastq_to_bam_parser.add_argument('--r2-fastq', help='Path to R2 FASTQ file.', type=str, required=True)
fastq_to_bam_parser.add_argument('--cell-barcodes', help='Path to cell barcode mapping TSV file.', type=str, required=True)
fastq_to_bam_parser.add_argument('--umi-mapping', help='Path to UMI mapping TSV file.', type=str, required=True)
fastq_to_bam_parser.add_argument("--map", required=True, help="TSV(.gz) with columns: rawCB, correctedCB, rawUMI, correctedUMI")
fastq_to_bam_parser.add_argument('--output', help='Path for output BAM file.', type=str, required=True)
fastq_to_bam_parser.add_argument('-c', '--num_cores', help='The number of cores to use for processing.', type=int, default=1)

Expand All @@ -444,8 +443,7 @@ def sort_input_bam(bam, cores, tmp_dir):
fastq_to_bam_with_barcodes(
args.r1_fastq,
args.r2_fastq,
args.cell_barcodes,
args.umi_mapping,
args.map,
args.output,
args.num_cores
)
Expand Down
157 changes: 108 additions & 49 deletions nimble/fastq_barcode_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,94 +6,131 @@
import pysam
from Bio import SeqIO

def load_barcode_mapping(tsv_path):
def load_cb_umi_mapping(tsv_path):
"""
Load a TSV file with Raw->Corrected barcode mappings into a dictionary.
Handles gzipped files automatically.
Load a TSV with 4 columns:
1) Raw CB
2) Corrected CB
3) Raw UMI
4) Corrected UMI

Returns:
cb_map: dict[str raw_cb] = str corrected_cb
umi_map_by_raw_cb: dict[str raw_cb] = dict[str raw_umi] = str corrected_umi
"""
barcode_map = {}
cb_map = {}
umi_map_by_raw_cb = defaultdict(dict)

open_func = gzip.open if tsv_path.endswith('.gz') else open
mode = 'rt' if tsv_path.endswith('.gz') else 'r'

line_ct = 0
malformed = 0
pairs_ct = 0
unique_cb = set()

try:
with open_func(tsv_path, mode) as f:
header = f.readline().strip().split('\t')
if len(header) != 2:
raise ValueError(f"Expected 2 columns in TSV header, got {len(header)}")

for line_num, line in enumerate(f, start=2):
for line_num, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
parts = line.split('\t')
if len(parts) != 2:
print(f"Warning: Skipping malformed line {line_num} in {tsv_path}: {line}")
if len(parts) < 4:
malformed += 1
continue
raw_barcode, corrected_barcode = parts
barcode_map[raw_barcode] = corrected_barcode

raw_cb, corr_cb, raw_umi, corr_umi = parts[0], parts[1], parts[2], parts[3]
cb_map[raw_cb] = corr_cb
umi_map_by_raw_cb[raw_cb][raw_umi] = corr_umi
pairs_ct += 1
unique_cb.add(raw_cb)
line_ct += 1
except Exception as e:
print(f"Error loading barcode mapping from {tsv_path}: {e}")
print(f"Error loading CB/UMI mapping from {tsv_path}: {e}", file=sys.stderr)
sys.exit(1)

print(f"Loaded {len(barcode_map)} barcode mappings from {tsv_path}")
return barcode_map
print(f"Loaded mapping from {tsv_path}")
print(f" Lines read: {line_ct + malformed}")
print(f" Malformed lines skipped (<4 cols): {malformed}")
print(f" Unique raw CBs: {len(unique_cb)}")
print(f" (raw CB, raw UMI) pairs: {pairs_ct}")
return cb_map, umi_map_by_raw_cb


def parse_10x_barcode_from_r1(sequence, cb_length=16, umi_length=12):
"""
Parse cell barcode and UMI from 10x R1 read sequence.
Returns (cell_barcode, umi, remaining_sequence)
"""
if len(sequence) < cb_length + umi_length:
return None, None, ""

cell_barcode = sequence[:cb_length]
umi = sequence[cb_length:cb_length + umi_length]
remaining_sequence = sequence[cb_length + umi_length:]

return cell_barcode, umi, remaining_sequence


def process_pair(r1_record, r2_record, cb_map, umi_map):
def process_pair(r1_record, r2_record, cb_map, umi_map_by_raw_cb, stats, cb_length=16, umi_length=12):
"""
Process a single FASTQ pair and return (r1_bam, r2_bam), or None if skipped.
Mapping logic:
- Look up corrected CB by raw CB.
- Look up corrected UMI by (raw CB, raw UMI).
Both must exist to emit a pair.
"""
# Normalize names (handle /1 and /2 suffixes if present)
r1_name = r1_record.id.removesuffix('/1')
r2_name = r2_record.id.removesuffix('/2')
if r1_name != r2_name:
stats['name_mismatch'] += 1
return None

r1_seq = str(r1_record.seq)
cell_barcode, umi, remaining_r1_seq = parse_10x_barcode_from_r1(r1_seq)
if cell_barcode is None or umi is None or len(remaining_r1_seq) == 0:
cell_barcode, umi, remaining_r1_seq = parse_10x_barcode_from_r1(r1_seq, cb_length, umi_length)
if cell_barcode is None or umi is None:
stats['too_short'] += 1
return None
if len(remaining_r1_seq) == 0:
stats['no_remaining_seq'] += 1
return None

# CB and UMI corrections (UMI in context of *raw* CB)
corrected_cb = cb_map.get(cell_barcode)
corrected_umi = umi_map.get(umi)
if corrected_cb is None or corrected_umi is None:
if corrected_cb is None:
stats['cb_not_in_map'] += 1
return None

barcode_length = len(cell_barcode) + len(umi)
umi_map_for_cb = umi_map_by_raw_cb.get(cell_barcode)
if not umi_map_for_cb:
stats['cb_has_no_umi_map'] += 1
return None

corrected_umi = umi_map_for_cb.get(umi)
if corrected_umi is None:
stats['umi_not_in_map_for_cb'] += 1
return None

# R1 BAM
barcode_length = cb_length + umi_length

# Build unaligned BAM records
r1_bam = pysam.AlignedSegment()
r1_bam.query_name = r1_name
r1_bam.query_sequence = remaining_r1_seq
# Biopython stores qualities as list[int]; slice past CB+UMI
r1_bam.query_qualities = r1_record.letter_annotations["phred_quality"][barcode_length:]
r1_bam.flag = 77
r1_bam.flag = 77 # 0x004D: paired, first in pair, unmapped, mate unmapped
r1_bam.reference_id = -1
r1_bam.reference_start = -1
r1_bam.mapping_quality = 0
r1_bam.set_tag("CB", corrected_cb)
r1_bam.set_tag("UB", corrected_umi)

# R2 BAM
r2_bam = pysam.AlignedSegment()
r2_bam.query_name = r2_name
r2_bam.query_sequence = str(r2_record.seq)
r2_bam.query_qualities = r2_record.letter_annotations["phred_quality"]
r2_bam.flag = 141
r2_bam.flag = 141 # 0x008D: paired, second in pair, unmapped, mate unmapped
r2_bam.reference_id = -1
r2_bam.reference_start = -1
r2_bam.mapping_quality = 0
Expand All @@ -103,13 +140,20 @@ def process_pair(r1_record, r2_record, cb_map, umi_map):
return r1_bam, r2_bam


def fastq_to_bam_with_barcodes(r1_fastq, r2_fastq, cb_mapping_file, umi_mapping_file, output_bam, num_cores=1):
def fastq_to_bam_with_barcodes(r1_fastq, r2_fastq, cb_umi_mapping_file, output_bam, num_cores=1, cb_length=16, umi_length=12):
"""
Convert paired FASTQ files to unaligned BAM with CB/UB tags using multiple threads.

Args:
r1_fastq: path to R1 FASTQ(.gz)
r2_fastq: path to R2 FASTQ(.gz)
cb_umi_mapping_file: TSV(.gz) with 4 columns (rawCB, corrCB, rawUMI, corrUMI)
output_bam: path to output BAM
num_cores: threads
cb_length, umi_length: lengths for parsing R1
"""
print("Loading barcode mappings...")
cb_map = load_barcode_mapping(cb_mapping_file)
umi_map = load_barcode_mapping(umi_mapping_file)
print("Loading CB/UMI mapping...")
cb_map, umi_map_by_raw_cb = load_cb_umi_mapping(cb_umi_mapping_file)

stats = defaultdict(int)

Expand All @@ -119,8 +163,8 @@ def fastq_to_bam_with_barcodes(r1_fastq, r2_fastq, cb_mapping_file, umi_mapping_
r2_mode = 'rt' if r2_fastq.endswith('.gz') else 'r'

header = {
'HD': {'VN': '1.4', 'SO': 'queryname'},
'PG': [{'ID': 'nimble-fastq-to-bam', 'PN': 'nimble', 'VN': '1.0'}]
'HD': {'VN': '1.6', 'SO': 'queryname'},
'PG': [{'ID': 'nimble-fastq-to-bam', 'PN': 'nimble', 'VN': '1.1', 'CL': 'single-tsv, cb-scoped-umi'}]
}

print(f"Processing paired FASTQ files with {num_cores} threads...")
Expand All @@ -137,39 +181,54 @@ def fastq_to_bam_with_barcodes(r1_fastq, r2_fastq, cb_mapping_file, umi_mapping_
futures = {}

for idx, (r1_record, r2_record) in enumerate(zip(r1_iter, r2_iter), start=1):
stats['total_reads'] += 1
fut = executor.submit(process_pair, r1_record, r2_record, cb_map, umi_map)
futures[fut] = idx
stats['total_pairs'] += 1
fut = executor.submit(process_pair, r1_record, r2_record,
cb_map, umi_map_by_raw_cb, stats, cb_length, umi_length)
futures[fut] = True

# Throttle in-flight futures to limit memory use
if len(futures) >= num_cores * 100:
for done in as_completed(list(futures)[:num_cores]):
result = done.result()
if result:
r1_bam, r2_bam = result
done_any = 0
for done in as_completed(list(futures)[:num_cores * 10]):
res = done.result()
if res:
r1_bam, r2_bam = res
bam_out.write(r1_bam)
bam_out.write(r2_bam)
stats['written_pairs'] += 1
del futures[done]
done_any += 1
if done_any >= num_cores * 10:
break

if stats['total_reads'] % 1000000 == 0:
print(f"Processed {stats['total_reads']} reads...")
if stats['total_pairs'] % 1_000_000 == 0:
print(f"Processed {stats['total_pairs']} read pairs...")

# Drain remaining futures
for done in as_completed(futures):
result = done.result()
if result:
r1_bam, r2_bam = result
for done in as_completed(list(futures.keys())):
res = done.result()
if res:
r1_bam, r2_bam = res
bam_out.write(r1_bam)
bam_out.write(r2_bam)
stats['written_pairs'] += 1
del futures[done]

except Exception as e:
print(f"Error during processing: {e}")
print(f"Error during processing: {e}", file=sys.stderr)
sys.exit(1)

# Print summary
print("\n=== Processing Statistics ===")
ordered = [
'total_pairs', 'written_pairs',
'name_mismatch', 'too_short', 'no_remaining_seq',
'cb_not_in_map', 'cb_has_no_umi_map', 'umi_not_in_map_for_cb'
]
for k in ordered:
print(f"{k.replace('_', ' ').capitalize()}: {stats.get(k, 0)}")
# Print any other counters encountered
for k, v in stats.items():
print(f"{k.replace('_', ' ').capitalize()}: {v}")
if k not in ordered:
print(f"{k.replace('_', ' ').capitalize()}: {v}")
print(f"Output BAM written to: {output_bam}")