Source code for crabml.io.sequences

"""
Sequence file parsing and alignment handling.
"""

import re
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

import numpy as np


# Genetic code tables (standard code)
GENETIC_CODE = {
    'TTT': 'F', 'TTC': 'F', 'TTA': 'L', 'TTG': 'L',
    'TCT': 'S', 'TCC': 'S', 'TCA': 'S', 'TCG': 'S',
    'TAT': 'Y', 'TAC': 'Y', 'TAA': '*', 'TAG': '*',
    'TGT': 'C', 'TGC': 'C', 'TGA': '*', 'TGG': 'W',
    'CTT': 'L', 'CTC': 'L', 'CTA': 'L', 'CTG': 'L',
    'CCT': 'P', 'CCC': 'P', 'CCA': 'P', 'CCG': 'P',
    'CAT': 'H', 'CAC': 'H', 'CAA': 'Q', 'CAG': 'Q',
    'CGT': 'R', 'CGC': 'R', 'CGA': 'R', 'CGG': 'R',
    'ATT': 'I', 'ATC': 'I', 'ATA': 'I', 'ATG': 'M',
    'ACT': 'T', 'ACC': 'T', 'ACA': 'T', 'ACG': 'T',
    'AAT': 'N', 'AAC': 'N', 'AAA': 'K', 'AAG': 'K',
    'AGT': 'S', 'AGC': 'S', 'AGA': 'R', 'AGG': 'R',
    'GTT': 'V', 'GTC': 'V', 'GTA': 'V', 'GTG': 'V',
    'GCT': 'A', 'GCC': 'A', 'GCA': 'A', 'GCG': 'A',
    'GAT': 'D', 'GAC': 'D', 'GAA': 'E', 'GAG': 'E',
    'GGT': 'G', 'GGC': 'G', 'GGA': 'G', 'GGG': 'G',
}

# Build codon to index mapping (excluding stop codons)
# IMPORTANT: Order must match PAML's FROM61 array!
# PAML uses nucleotide indices: T=0, C=1, A=2, G=3
# Codon i has index: n0*16 + n1*4 + n2
_INDEX_TO_NUCLEOTIDE_PAML = {0: 'T', 1: 'C', 2: 'A', 3: 'G'}
CODONS = []
for i in range(64):
    n0 = i // 16
    n1 = (i // 4) % 4
    n2 = i % 4
    codon = _INDEX_TO_NUCLEOTIDE_PAML[n0] + _INDEX_TO_NUCLEOTIDE_PAML[n1] + _INDEX_TO_NUCLEOTIDE_PAML[n2]
    if GENETIC_CODE[codon] != '*':  # Skip stop codons (TAA, TAG, TGA)
        CODONS.append(codon)

CODON_TO_INDEX = {codon: i for i, codon in enumerate(CODONS)}
INDEX_TO_CODON = {i: codon for i, codon in enumerate(CODONS)}

# Special codes for missing/ambiguous data
GAP_CODE = 64  # Gap codon (---)
UNKNOWN_CODE = -1  # Unknown/stop codon

# Nucleotide encoding
NUCLEOTIDE_TO_INDEX = {'T': 0, 'C': 1, 'A': 2, 'G': 3}
INDEX_TO_NUCLEOTIDE = {0: 'T', 1: 'C', 2: 'A', 3: 'G'}

# Amino acid encoding
AMINO_ACIDS = 'ARNDCQEGHILKMFPSTWYV'
AA_TO_INDEX = {aa: i for i, aa in enumerate(AMINO_ACIDS)}
INDEX_TO_AA = {i: aa for i, aa in enumerate(AMINO_ACIDS)}


[docs] @dataclass class Alignment: """ Multiple sequence alignment. Attributes ---------- names : list[str] Sequence names/labels sequences : ndarray, shape (n_species, n_sites) Encoded sequences as integer arrays n_species : int Number of sequences n_sites : int Number of sites (alignment length) seqtype : str Sequence type ('codon', 'aa', 'dna') """ names: list[str] sequences: np.ndarray n_species: int n_sites: int seqtype: str
[docs] @classmethod def from_phylip( cls, filepath: Path | str, seqtype: str = "codon" ) -> "Alignment": """ Parse PHYLIP format alignment file. Custom parser for PAML-style PHYLIP format (sequential). The first line contains n_sequences and sequence_length. Each sequence starts with a name line, followed by sequence data. Parameters ---------- filepath : Path or str Path to PHYLIP format file seqtype : str Sequence type: 'codon', 'aa', or 'dna' Returns ------- Alignment Parsed alignment Examples -------- >>> aln = Alignment.from_phylip("lysozyme.txt", seqtype='codon') >>> aln.n_species 7 >>> aln.n_sites 130 """ filepath = Path(filepath) with open(filepath, 'r') as f: lines = [line.rstrip() for line in f.readlines()] # Parse header header = lines[0].strip().split() n_species = int(header[0]) n_chars = int(header[1]) # Calculate n_sites if seqtype == 'codon': if n_chars % 3 != 0: raise ValueError(f"Codon sequence length {n_chars} not divisible by 3") n_sites = n_chars // 3 else: n_sites = n_chars # Parse sequences names = [] sequences_raw = [] i = 1 while i < len(lines) and len(names) < n_species: line = lines[i].strip() i += 1 # Skip empty lines if not line: continue # This is a sequence name name = line names.append(name) # Collect sequence data until we have enough chars or hit next name seq_data = "" while i < len(lines): line = lines[i].strip() # Skip empty lines if not line: i += 1 continue # Check if this is DNA/protein sequence data clean = re.sub(r'\s', '', line).upper() # If we have enough characters, we're done with this sequence if len(re.sub(r'\s', '', seq_data)) >= n_chars: break # Add this line to sequence seq_data += clean i += 1 sequences_raw.append(seq_data) if len(names) != n_species: raise ValueError(f"Expected {n_species} sequences, found {len(names)}") # Verify all sequences have correct length for i, seq in enumerate(sequences_raw): if len(seq) != n_chars: raise ValueError( f"Sequence {names[i]} has length {len(seq)}, expected {n_chars}" ) # Encode sequences if seqtype == 'codon': encoded = cls._encode_codons(sequences_raw) elif seqtype == 'dna': encoded = cls._encode_nucleotides(sequences_raw) elif seqtype == 'aa': encoded = cls._encode_amino_acids(sequences_raw) else: raise ValueError(f"Unknown seqtype: {seqtype}") return cls( names=names, sequences=encoded, n_species=n_species, n_sites=n_sites, seqtype=seqtype, )
[docs] @classmethod def from_fasta( cls, filepath: Path | str, seqtype: str = "codon" ) -> "Alignment": """ Parse FASTA format alignment file. Parameters ---------- filepath : Path or str Path to FASTA format file seqtype : str Sequence type: 'codon', 'aa', or 'dna' Returns ------- Alignment Parsed alignment Examples -------- >>> aln = Alignment.from_fasta("alignment.fasta", seqtype='codon') """ filepath = Path(filepath) names = [] sequences_raw = [] with open(filepath, 'r') as f: current_name = None current_seq = [] for line in f: line = line.strip() if not line: continue if line.startswith('>'): # Save previous sequence if exists if current_name is not None: names.append(current_name) sequences_raw.append(''.join(current_seq)) # Start new sequence current_name = line[1:].strip() current_seq = [] else: # Add to current sequence current_seq.append(line.upper()) # Don't forget last sequence if current_name is not None: names.append(current_name) sequences_raw.append(''.join(current_seq)) if not names: raise ValueError("No sequences found in FASTA file") # Remove spaces but KEEP gaps (PAML cleandata=0 behavior) sequences_clean = [re.sub(r'\s', '', seq) for seq in sequences_raw] # Check all sequences same length (WITH gaps intact) seq_lengths = [len(seq) for seq in sequences_clean] if len(set(seq_lengths)) > 1: raise ValueError( f"Sequences have different lengths: {set(seq_lengths)}" ) n_species = len(names) n_chars = len(sequences_clean[0]) # Calculate n_sites if seqtype == 'codon': if n_chars % 3 != 0: raise ValueError(f"Codon sequence length {n_chars} not divisible by 3") n_sites = n_chars // 3 else: n_sites = n_chars # Encode sequences if seqtype == 'codon': encoded = cls._encode_codons(sequences_clean) elif seqtype == 'dna': encoded = cls._encode_nucleotides(sequences_clean) elif seqtype == 'aa': encoded = cls._encode_amino_acids(sequences_clean) else: raise ValueError(f"Unknown seqtype: {seqtype}") return cls( names=names, sequences=encoded, n_species=n_species, n_sites=n_sites, seqtype=seqtype, )
@staticmethod def _encode_codons(sequences: list[str]) -> np.ndarray: """ Encode codon sequences as integer arrays. Parameters ---------- sequences : list[str] List of nucleotide sequences (length divisible by 3) Returns ------- encoded : ndarray, shape (n_sequences, n_codons) Encoded sequences where each element is a codon index (0-60) or GAP_CODE (64) for gaps (---) or UNKNOWN_CODE (-1) for invalid """ n_sequences = len(sequences) n_codons = len(sequences[0]) // 3 encoded = np.zeros((n_sequences, n_codons), dtype=np.int8) for i, seq in enumerate(sequences): for j in range(n_codons): codon = seq[j * 3 : j * 3 + 3] if codon == '---': # Gap codon (missing data) encoded[i, j] = GAP_CODE elif codon in CODON_TO_INDEX: # Valid codon encoded[i, j] = CODON_TO_INDEX[codon] else: # Stop codon or invalid - mark as unknown encoded[i, j] = UNKNOWN_CODE return encoded @staticmethod def _encode_nucleotides(sequences: list[str]) -> np.ndarray: """Encode DNA sequences as integer arrays (0=T, 1=C, 2=A, 3=G).""" n_sequences = len(sequences) n_sites = len(sequences[0]) encoded = np.zeros((n_sequences, n_sites), dtype=np.int8) for i, seq in enumerate(sequences): for j, nucleotide in enumerate(seq): if nucleotide in NUCLEOTIDE_TO_INDEX: encoded[i, j] = NUCLEOTIDE_TO_INDEX[nucleotide] else: encoded[i, j] = -1 # Unknown/ambiguous return encoded @staticmethod def _encode_amino_acids(sequences: list[str]) -> np.ndarray: """Encode amino acid sequences as integer arrays.""" n_sequences = len(sequences) n_sites = len(sequences[0]) encoded = np.zeros((n_sequences, n_sites), dtype=np.int8) for i, seq in enumerate(sequences): for j, aa in enumerate(seq): if aa in AA_TO_INDEX: encoded[i, j] = AA_TO_INDEX[aa] else: encoded[i, j] = -1 # Unknown return encoded
[docs] def to_phylip(self, filepath: Path | str) -> None: """ Write alignment to PHYLIP format file. Parameters ---------- filepath : Path or str Output file path """ filepath = Path(filepath) with open(filepath, 'w') as f: # Header n_chars = self.n_sites * (3 if self.seqtype == 'codon' else 1) f.write(f" {self.n_species} {n_chars}\n\n") # Sequences for name, encoded_seq in zip(self.names, self.sequences): f.write(f"{name}\n") # Decode sequence if self.seqtype == 'codon': seq = ''.join(INDEX_TO_CODON[idx] for idx in encoded_seq) elif self.seqtype == 'dna': seq = ''.join(INDEX_TO_NUCLEOTIDE[idx] for idx in encoded_seq) elif self.seqtype == 'aa': seq = ''.join(INDEX_TO_AA[idx] for idx in encoded_seq) # Write in blocks of 60 for i in range(0, len(seq), 60): f.write(seq[i:i+60] + '\n') f.write('\n')
[docs] def to_fasta(self, filepath: Path | str) -> None: """ Write alignment to FASTA format file. Parameters ---------- filepath : Path or str Output file path """ filepath = Path(filepath) with open(filepath, 'w') as f: for name, encoded_seq in zip(self.names, self.sequences): f.write(f">{name}\n") # Decode sequence if self.seqtype == 'codon': seq = ''.join(INDEX_TO_CODON[idx] for idx in encoded_seq) elif self.seqtype == 'dna': seq = ''.join(INDEX_TO_NUCLEOTIDE[idx] for idx in encoded_seq) elif self.seqtype == 'aa': seq = ''.join(INDEX_TO_AA[idx] for idx in encoded_seq) # Write in blocks of 60 for i in range(0, len(seq), 60): f.write(seq[i:i+60] + '\n')
def __repr__(self) -> str: return ( f"Alignment(n_species={self.n_species}, n_sites={self.n_sites}, " f"seqtype='{self.seqtype}')" )