Source code for crabml.optimize.optimizer

"""
Parameter optimization for phylogenetic models.
"""

import numpy as np
from scipy.optimize import minimize
from typing import Tuple

from ..core.likelihood_rust import RustLikelihoodCalculator
from ..models.codon import (
    M0CodonModel,
    M1aCodonModel,
    M2aCodonModel,
    M3CodonModel,
    M7CodonModel,
    M8CodonModel,
    M8aCodonModel,
    compute_codon_frequencies_f3x4,
)
from ..io.sequences import Alignment
from ..io.trees import Tree


[docs] class M0Optimizer: """ Optimize parameters for M0 codon model. This optimizer estimates: - kappa (transition/transversion ratio) - omega (dN/dS ratio) - branch lengths (individual or global scaling) Using maximum likelihood estimation. """
[docs] def __init__( self, alignment: Alignment, tree: Tree, use_f3x4: bool = True, optimize_branch_lengths: bool = True ): """ Initialize optimizer. Parameters ---------- alignment : Alignment Codon alignment tree : Tree Phylogenetic tree use_f3x4 : bool Use F3X4 codon frequencies (True) or uniform (False) optimize_branch_lengths : bool Optimize individual branch lengths (True) or use global scaling (False) """ self.alignment = alignment self.tree = tree self.use_f3x4 = use_f3x4 self.optimize_branch_lengths = optimize_branch_lengths # Compute codon frequencies if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) # Get list of nodes with branches (exclude root) self.branch_nodes = [node for node in tree.postorder() if node.parent is not None] self.n_branches = len(self.branch_nodes) # Store optimization history self.history = []
[docs] def compute_log_likelihood(self, params: np.ndarray) -> float: """ Compute negative log-likelihood for optimization. Parameters ---------- params : np.ndarray If optimize_branch_lengths is True: [log(kappa), log(omega), log(branch1), log(branch2), ...] If optimize_branch_lengths is False: [log(kappa), log(omega), log(branch_scale)] Returns ------- float Negative log-likelihood """ # Transform parameters from log scale kappa = np.exp(params[0]) omega = np.exp(params[1]) # Update branch lengths if self.optimize_branch_lengths: # Individual branch lengths for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[2 + i]) branch_scale = 1.0 # Don't scale when using individual lengths else: # Global scaling factor branch_scale = np.exp(params[2]) # Create model with these parameters model = M0CodonModel(kappa=kappa, omega=omega, pi=self.pi) Q = model.get_Q_matrix() # Compute log-likelihood try: log_likelihood = self.calc.compute_log_likelihood( Q, self.pi, scale_branch_lengths=branch_scale ) except Exception as e: # If computation fails, return very bad likelihood print(f"Error computing likelihood: {e}") return 1e10 # Store in history hist_entry = { 'kappa': kappa, 'omega': omega, 'log_likelihood': log_likelihood } if self.optimize_branch_lengths: hist_entry['branch_lengths'] = [node.branch_length for node in self.branch_nodes] else: hist_entry['branch_scale'] = branch_scale self.history.append(hist_entry) # Return negative for minimization return -log_likelihood
[docs] def optimize( self, init_kappa: float = 2.0, init_omega: float = 0.4, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float]: """ Optimize parameters to maximize likelihood. Parameters ---------- init_kappa : float Initial kappa value init_omega : float Initial omega value method : str Optimization method (default 'L-BFGS-B') maxiter : int Maximum number of iterations Returns ------- tuple (kappa, omega, log_likelihood) if optimize_branch_lengths is True Branch lengths are updated in the tree directly """ # Initial parameters (in log space) if self.optimize_branch_lengths: # Start with current branch lengths from tree init_branch_lengths = [node.branch_length for node in self.branch_nodes] # Check if all branch lengths are zero or very small (e.g., tree has no branch lengths) # Use least-squares distance to initialize from sequence data (PAML approach) max_bl = max(init_branch_lengths) if max_bl < 0.001: print(f"Warning: Tree has no branch lengths.") print(f"Initializing with least-squares distance estimation...") from .distance_init import initialize_branch_lengths_ls init_branch_lengths = initialize_branch_lengths_ls( self.alignment, self.tree, self.branch_nodes ) # Update tree nodes with initial values for i, node in enumerate(self.branch_nodes): node.branch_length = init_branch_lengths[i] print(f"LS distance initialization complete.") print(f" Mean branch length: {np.mean(init_branch_lengths):.6f}") print(f" Range: [{np.min(init_branch_lengths):.6f}, {np.max(init_branch_lengths):.6f}]") init_params = np.array( [np.log(init_kappa), np.log(init_omega)] + [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) # Bounds: kappa [0.1, 100], omega [0.001, 10], branches [0.0001, 50] bounds = ( [(np.log(0.1), np.log(100)), (np.log(0.001), np.log(10))] + [(np.log(0.0001), np.log(50))] * self.n_branches ) print(f"Starting optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, omega={init_omega:.4f}") print(f"Optimizing {self.n_branches} branch lengths") else: # Use global branch scaling init_params = np.array([ np.log(init_kappa), np.log(init_omega), np.log(1.0) # Initial branch scale ]) bounds = [ (np.log(0.1), np.log(100)), # log(kappa) (np.log(0.001), np.log(10)), # log(omega) (np.log(0.01), np.log(100)) # log(branch_scale) ] print(f"Starting optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, omega={init_omega:.4f}") # Clear history self.history = [] # Optimize result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) # Extract optimal parameters opt_kappa = np.exp(result.x[0]) opt_omega = np.exp(result.x[1]) opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, omega={opt_omega:.4f}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") if self.optimize_branch_lengths: print(f"Branch lengths updated in tree") else: opt_branch_scale = np.exp(result.x[2]) print(f"Branch scale: {opt_branch_scale:.4f}") return opt_kappa, opt_omega, opt_log_likelihood
[docs] class M1aOptimizer: """ Optimize parameters for M1a (NearlyNeutral) codon model. Estimates: - kappa (transition/transversion ratio) - omega0 (dN/dS for purifying class, constrained < 1) - p0 (proportion in purifying class) - branch lengths (individual or global scaling) """
[docs] def __init__( self, alignment: Alignment, tree: Tree, use_f3x4: bool = True, optimize_branch_lengths: bool = True, init_with_m0: bool = True ): """ Initialize M1a optimizer. Parameters ---------- init_with_m0 : bool Initialize branch lengths by optimizing M0 first (default: True). This dramatically improves convergence. """ self.alignment = alignment self.tree = tree self.use_f3x4 = use_f3x4 self.optimize_branch_lengths = optimize_branch_lengths self.init_with_m0 = init_with_m0 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) self.branch_nodes = [node for node in tree.postorder() if node.parent is not None] self.n_branches = len(self.branch_nodes) self.history = []
[docs] def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" # Extract parameters kappa = np.exp(params[0]) omega0 = np.exp(params[1]) p0 = 1.0 / (1.0 + np.exp(-params[2])) # Sigmoid for [0,1] # Update branch lengths if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[3 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[3]) # Create model model = M1aCodonModel(kappa=kappa, omega0=omega0, p0=p0, pi=self.pi) # Compute likelihood try: Q_matrices = model.get_Q_matrices() proportions, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, proportions, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 # Store in history self.history.append({ 'kappa': kappa, 'omega0': omega0, 'p0': p0, 'log_likelihood': log_likelihood }) return -log_likelihood
[docs] def optimize( self, init_kappa: float = 2.0, init_omega0: float = 0.5, init_p0: float = 0.7, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float, float]: """Optimize M1a parameters.""" # Initialize branch lengths with M0 if requested if self.init_with_m0 and self.optimize_branch_lengths: print("Initializing branch lengths with M0...") m0_optimizer = M0Optimizer( self.alignment, self.tree, use_f3x4=self.use_f3x4, optimize_branch_lengths=True ) _, _, m0_lnL = m0_optimizer.optimize() print(f"M0 initialization complete: lnL = {m0_lnL:.6f}\n") # Initial parameters if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa), np.log(init_omega0), np.log(init_p0 / (1 - init_p0))] + # Logit transform [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100)), # kappa (np.log(0.001), np.log(0.999)), # omega0 (-10, 10)] + # p0 (logit) [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array([ np.log(init_kappa), np.log(init_omega0), np.log(init_p0 / (1 - init_p0)), np.log(1.0) ]) bounds = [ (np.log(0.1), np.log(100)), (np.log(0.001), np.log(0.999)), (-10, 10), (np.log(0.01), np.log(100)) ] print(f"Starting M1a optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, omega0={init_omega0:.4f}, p0={init_p0:.4f}") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) opt_omega0 = np.exp(result.x[1]) opt_p0 = 1.0 / (1.0 + np.exp(-result.x[2])) opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, omega0={opt_omega0:.4f}, p0={opt_p0:.4f}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_omega0, opt_p0, opt_log_likelihood
[docs] class M2aOptimizer: """ Optimize parameters for M2a (PositiveSelection) codon model. Estimates: - kappa - omega0 (purifying, < 1) - omega2 (positive selection, > 1) - p0, p1 (proportions) - branch lengths """
[docs] def __init__( self, alignment: Alignment, tree: Tree, use_f3x4: bool = True, optimize_branch_lengths: bool = True, init_with_m0: bool = True ): """ Initialize M2a optimizer. Parameters ---------- init_with_m0 : bool Initialize branch lengths by optimizing M0 first (default: True). """ self.alignment = alignment self.tree = tree self.use_f3x4 = use_f3x4 self.optimize_branch_lengths = optimize_branch_lengths self.init_with_m0 = init_with_m0 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) self.branch_nodes = [node for node in tree.postorder() if node.parent is not None] self.n_branches = len(self.branch_nodes) self.history = []
[docs] def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" kappa = np.exp(params[0]) omega0 = np.exp(params[1]) omega2 = np.exp(params[2]) # Hierarchical parameterization to ensure p0 + p1 + p2 = 1 # p0 is the proportion of purifying sites # f is the fraction of remaining sites that are neutral (vs positive) # So: p0 ∈ [0,1], f ∈ [0,1] # p1 = (1 - p0) * f # p2 = (1 - p0) * (1 - f) p0 = 1.0 / (1.0 + np.exp(-params[3])) # Sigmoid for p0 f = 1.0 / (1.0 + np.exp(-params[4])) # Sigmoid for fraction f p1 = (1.0 - p0) * f # p2 = (1.0 - p0) * (1.0 - f) # Not needed, just for reference # Update branch lengths if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[5 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[5]) # Create model model = M2aCodonModel( kappa=kappa, omega0=omega0, omega2=omega2, p0=p0, p1=p1, pi=self.pi ) # Compute likelihood try: Q_matrices = model.get_Q_matrices() proportions, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, proportions, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'omega0': omega0, 'omega2': omega2, 'p0': p0, 'p1': p1, 'log_likelihood': log_likelihood }) return -log_likelihood
[docs] def optimize( self, init_kappa: float = 2.0, init_omega0: float = 0.5, init_omega2: float = 2.0, init_p0: float = 0.5, init_p1: float = 0.3, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float, float, float, float]: """Optimize M2a parameters.""" # Initialize branch lengths with M0 if requested if self.init_with_m0 and self.optimize_branch_lengths: print("Initializing branch lengths with M0...") m0_optimizer = M0Optimizer( self.alignment, self.tree, use_f3x4=self.use_f3x4, optimize_branch_lengths=True ) _, _, m0_lnL = m0_optimizer.optimize() print(f"M0 initialization complete: lnL = {m0_lnL:.6f}\n") # Hierarchical parameterization: # p0 is the proportion of purifying sites # f is the fraction of remaining sites that are neutral # p1 = (1 - p0) * f, p2 = (1 - p0) * (1 - f) init_f = init_p1 / (1.0 - init_p0) if init_p0 < 1.0 else 0.5 init_f = max(0.01, min(0.99, init_f)) # Keep in valid range logit_p0 = np.log(init_p0 / (1 - init_p0)) logit_f = np.log(init_f / (1 - init_f)) if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa), np.log(init_omega0), np.log(init_omega2), logit_p0, logit_f] + # p0 and f logits [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100)), # kappa (np.log(0.001), np.log(0.999)), # omega0 (np.log(1.001), np.log(20)), # omega2 (-10, 10), (-10, 10)] + # p0, p1 logits [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array([ np.log(init_kappa), np.log(init_omega0), np.log(init_omega2), logit_p0, logit_f, np.log(1.0) ]) bounds = [ (np.log(0.1), np.log(100)), (np.log(0.001), np.log(0.999)), (np.log(1.001), np.log(20)), (-10, 10), (-10, 10), (np.log(0.01), np.log(100)) ] print(f"Starting M2a optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, omega0={init_omega0:.4f}, " f"omega2={init_omega2:.4f}") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) opt_omega0 = np.exp(result.x[1]) opt_omega2 = np.exp(result.x[2]) # Extract proportions using hierarchical parameterization opt_p0 = 1.0 / (1.0 + np.exp(-result.x[3])) opt_f = 1.0 / (1.0 + np.exp(-result.x[4])) opt_p1 = (1.0 - opt_p0) * opt_f opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, omega0={opt_omega0:.4f}, " f"omega2={opt_omega2:.4f}") print(f"Proportions: p0={opt_p0:.4f}, p1={opt_p1:.4f}, p2={1-opt_p0-opt_p1:.4f}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_omega0, opt_omega2, opt_p0, opt_p1, opt_log_likelihood
[docs] class M3Optimizer: """ Optimize parameters for M3 (Discrete) codon model. Matches PAML's codeml implementation exactly: - Parameter layout: [kappa, logit_p0..logit_p_{K-2}, omega_0..omega_{K-1}, log(branches)...] - Proportion transform: PAML's f_and_x (softmax with implicit last class) - Bounds: kappa [1e-4, 999], logits [-99, 99], omegas [1e-6, 999] - Initialization: PAML's GetInitialsCodon - Optimizer: L-BFGS-B (equivalent to PAML's ming2) - Post-optimization: sortwM3 (sort omegas ascending) """
[docs] def __init__( self, alignment: Alignment, tree: Tree, n_classes: int = 3, use_f3x4: bool = True, optimize_branch_lengths: bool = True, init_with_m0: bool = True ): self.alignment = alignment self.tree = tree self.n_classes = n_classes self.use_f3x4 = use_f3x4 self.optimize_branch_lengths = optimize_branch_lengths self.init_with_m0 = init_with_m0 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 self.calc = RustLikelihoodCalculator(alignment, tree) self.branch_nodes = [node for node in tree.postorder() if node.parent is not None] self.n_branches = len(self.branch_nodes) self.history = []
@staticmethod def _f_and_x(logits): """PAML's f_and_x: convert K-1 logits to K proportions. f[k] = exp(x[k]) / (1 + sum(exp(x[j]))), k=0..K-2 f[K-1] = 1 / (1 + sum(exp(x[j]))) """ exp_logits = np.exp(np.asarray(logits, dtype=float)) denom = 1.0 + np.sum(exp_logits) return np.append(exp_logits / denom, 1.0 / denom)
[docs] def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood. PAML parameter layout: [kappa, logit_p0..logit_p_{K-2}, omega_0..omega_{K-1}, log(branch)...] """ K = self.n_classes kappa = params[0] logit_end = 1 + (K - 1) proportions = self._f_and_x(params[1:logit_end]) omega_end = logit_end + K omegas = params[logit_end:omega_end].tolist() if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[omega_end + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[omega_end]) model = M3CodonModel( kappa=kappa, omegas=omegas, proportions=proportions.tolist(), pi=self.pi ) try: Q_matrices = model.get_Q_matrices() prop_list, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, prop_list, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'omegas': omegas, 'proportions': proportions.tolist(), 'log_likelihood': log_likelihood }) return -log_likelihood
def _build_bounds(self, K): """Build PAML-style bounds for M3 parameters.""" bounds = [(1e-4, 999.0)] # kappa (rateb) bounds.extend([(-99.0, 99.0)] * (K - 1)) # proportion logits bounds.extend([(1e-6, 999.0)] * K) # omegas (wb[0]*0.01 for NSsites) if self.optimize_branch_lengths: bounds.extend([(np.log(0.0001), np.log(50))] * self.n_branches) else: bounds.append((np.log(0.01), np.log(100))) return bounds def _build_init_params(self, K, kappa, omega, rng=None): """Build initial parameter vector using PAML's GetInitialsCodon formulas. Parameters ---------- K : int Number of site classes. kappa : float Initial kappa (from M0 or control file). omega : float Initial omega (from M0 or control file). rng : numpy.random.Generator or None Random number generator for PAML-style rndu(). If None, uses deterministic midpoint (rndu=0.5). """ if rng is not None: rndu = rng.random else: rndu = lambda: 0.5 kappa_init = 0.1 + kappa * (0.8 + 0.4 * rndu()) init_logits = [rndu() for _ in range(K - 1)] init_omegas = [ omega * (0.5 + i * 2.0 / K * (0.8 + 0.4 * rndu())) for i in range(K) ] init_omegas = [max(w, 1e-4) for w in init_omegas] params = [kappa_init] params.extend(init_logits) params.extend(init_omegas) if self.optimize_branch_lengths: for node in self.branch_nodes: params.append(np.log(max(node.branch_length, 0.001))) else: params.append(np.log(1.0)) return np.array(params), kappa_init, init_omegas, init_logits @staticmethod def _enforce_bounds(params, bounds): """PAML SetxInitials: force initials inside bounds.""" for i in range(len(params)): lo, hi = bounds[i] if params[i] < lo * 1.005: params[i] = lo * 1.05 if params[i] > hi / 1.005: params[i] = hi / 1.05 def _extract_result(self, result_x, K): """Extract and sort parameters from optimizer result.""" opt_kappa = result_x[0] logit_end = 1 + (K - 1) opt_proportions = self._f_and_x(result_x[1:logit_end]).tolist() opt_omegas = result_x[logit_end:logit_end + K].tolist() # PAML sortwM3: sort omegas ascending, reorder proportions sorted_indices = np.argsort(opt_omegas) opt_omegas = [opt_omegas[i] for i in sorted_indices] opt_proportions = [opt_proportions[i] for i in sorted_indices] return opt_kappa, opt_omegas, opt_proportions
[docs] def optimize( self, init_kappa: float = 2.0, init_omegas: list[float] = None, init_proportions: list[float] = None, method: str = 'L-BFGS-B', maxiter: int = 500, n_restarts: int = 1 ) -> Tuple[float, list[float], list[float], float]: """ Optimize M3 parameters matching PAML's codeml. Uses L-BFGS-B (equivalent to PAML's ming2) with PAML's initialization formulas from GetInitialsCodon. By default uses a single run with random initialization (matching PAML). Set n_restarts > 1 for more robust optimization at the cost of speed. Returns (kappa, omegas, proportions, log_likelihood) """ m0_omega = 2.1 if self.init_with_m0 and self.optimize_branch_lengths: print("Initializing with M0...") m0_optimizer = M0Optimizer( self.alignment, self.tree, use_f3x4=self.use_f3x4, optimize_branch_lengths=True ) m0_kappa, m0_omega, m0_lnL = m0_optimizer.optimize() init_kappa = m0_kappa print(f"M0 initialization complete: kappa={m0_kappa:.4f}, omega={m0_omega:.4f}, lnL={m0_lnL:.6f}\n") K = self.n_classes bounds = self._build_bounds(K) # Save M0-optimized branch lengths for restarts m0_branch_lengths = [node.branch_length for node in self.branch_nodes] best_lnL = -np.inf best_result = None if n_restarts > 1: print(f"Starting M3 optimization (K={K}, {n_restarts} restarts)") else: print(f"Starting M3 optimization (K={K})") for trial in range(n_restarts): # Restore M0 branch lengths for each restart for node, bl in zip(self.branch_nodes, m0_branch_lengths): node.branch_length = bl # Random initialization matching PAML's rndu() rng = np.random.default_rng(seed=trial) init_params, kappa_init, trial_omegas, trial_logits = \ self._build_init_params(K, init_kappa, m0_omega, rng=rng) self._enforce_bounds(init_params, bounds) self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter, 'ftol': 1e-15, 'gtol': 1e-10} ) trial_lnL = -result.fun opt_kappa, opt_omegas, opt_proportions = self._extract_result(result.x, K) if n_restarts > 1: print(f" restart {trial}: lnL={trial_lnL:.6f}, " f"w=[{', '.join(f'{w:.4f}' for w in opt_omegas)}]") if trial_lnL > best_lnL: best_lnL = trial_lnL best_result = (opt_kappa, opt_omegas, opt_proportions) # Save branch lengths from best result best_branch_lengths = [node.branch_length for node in self.branch_nodes] opt_kappa, opt_omegas, opt_proportions = best_result # Restore best branch lengths for node, bl in zip(self.branch_nodes, best_branch_lengths): node.branch_length = bl print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}") print(f"Final omegas: {', '.join([f'{w:.4f}' for w in opt_omegas])}") print(f"Final proportions: {', '.join([f'{p:.4f}' for p in opt_proportions])}") print(f"Log-likelihood: {best_lnL:.6f}") return opt_kappa, opt_omegas, opt_proportions, best_lnL
[docs] class M7Optimizer: """ Optimize parameters for M7 (beta) codon model. Estimates: - kappa (transition/transversion ratio) - p_beta (beta distribution shape parameter 1) - q_beta (beta distribution shape parameter 2) - branch lengths """
[docs] def __init__( self, alignment: Alignment, tree: Tree, ncatG: int = 10, use_f3x4: bool = True, optimize_branch_lengths: bool = True, init_with_m0: bool = True ): """ Initialize M7 optimizer. Parameters ---------- init_with_m0 : bool Initialize branch lengths by optimizing M0 first (default: True). """ self.alignment = alignment self.tree = tree self.ncatG = ncatG self.use_f3x4 = use_f3x4 self.optimize_branch_lengths = optimize_branch_lengths self.init_with_m0 = init_with_m0 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) self.branch_nodes = [node for node in tree.postorder() if node.parent is not None] self.n_branches = len(self.branch_nodes) self.history = []
[docs] def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" kappa = np.exp(params[0]) p_beta = np.exp(params[1]) q_beta = np.exp(params[2]) # Update branch lengths if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[3 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[3]) # Create model model = M7CodonModel( kappa=kappa, p_beta=p_beta, q_beta=q_beta, ncatG=self.ncatG, pi=self.pi ) # Compute likelihood try: Q_matrices = model.get_Q_matrices() proportions, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, proportions, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'p_beta': p_beta, 'q_beta': q_beta, 'log_likelihood': log_likelihood }) return -log_likelihood
[docs] def optimize( self, init_kappa: float = 2.0, init_p_beta: float = 0.5, init_q_beta: float = 0.5, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float, float]: """Optimize M7 parameters.""" # Initialize branch lengths with M0 if requested if self.init_with_m0 and self.optimize_branch_lengths: print("Initializing branch lengths with M0...") m0_optimizer = M0Optimizer( self.alignment, self.tree, use_f3x4=self.use_f3x4, optimize_branch_lengths=True ) _, _, m0_lnL = m0_optimizer.optimize() print(f"M0 initialization complete: lnL = {m0_lnL:.6f}\n") # Initial parameters if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa), np.log(init_p_beta), np.log(init_q_beta)] + [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100)), # kappa (np.log(0.005), np.log(99)), # p_beta (np.log(0.005), np.log(99))] + # q_beta [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array([ np.log(init_kappa), np.log(init_p_beta), np.log(init_q_beta), np.log(1.0) ]) bounds = [ (np.log(0.1), np.log(100)), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(0.01), np.log(100)) ] print(f"Starting M7 optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, p={init_p_beta:.4f}, q={init_q_beta:.4f}") print(f"Beta distribution discretized into {self.ncatG} categories") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) opt_p_beta = np.exp(result.x[1]) opt_q_beta = np.exp(result.x[2]) opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, p={opt_p_beta:.4f}, q={opt_q_beta:.4f}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_p_beta, opt_q_beta, opt_log_likelihood
[docs] class M8Optimizer: """ Optimize parameters for M8 (beta & omega>1) codon model. Estimates: - kappa (transition/transversion ratio) - p0 (proportion in beta distribution) - p_beta (beta shape parameter 1) - q_beta (beta shape parameter 2) - omega_s (omega for positive selection class, > 1) - branch lengths """
[docs] def __init__( self, alignment: Alignment, tree: Tree, ncatG: int = 10, use_f3x4: bool = True, optimize_branch_lengths: bool = True, init_with_m0: bool = True ): """ Initialize M8 optimizer. Parameters ---------- init_with_m0 : bool Initialize branch lengths by optimizing M0 first (default: True). """ self.alignment = alignment self.tree = tree self.ncatG = ncatG self.use_f3x4 = use_f3x4 self.optimize_branch_lengths = optimize_branch_lengths self.init_with_m0 = init_with_m0 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) self.branch_nodes = [node for node in tree.postorder() if node.parent is not None] self.n_branches = len(self.branch_nodes) self.history = []
[docs] def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" kappa = np.exp(params[0]) p0 = 1.0 / (1.0 + np.exp(-params[1])) # Sigmoid for [0,1] p_beta = np.exp(params[2]) q_beta = np.exp(params[3]) omega_s = np.exp(params[4]) # Update branch lengths if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[5 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[5]) # Create model model = M8CodonModel( kappa=kappa, p0=p0, p_beta=p_beta, q_beta=q_beta, omega_s=omega_s, ncatG=self.ncatG, pi=self.pi ) # Compute likelihood try: Q_matrices = model.get_Q_matrices() proportions, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, proportions, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'p0': p0, 'p_beta': p_beta, 'q_beta': q_beta, 'omega_s': omega_s, 'log_likelihood': log_likelihood }) return -log_likelihood
[docs] def optimize( self, init_kappa: float = 2.0, init_p0: float = 0.9, init_p_beta: float = 0.7, init_q_beta: float = 1.5, init_omega_s: float = 2.5, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float, float, float, float]: """Optimize M8 parameters.""" # Initialize branch lengths with M0 if requested if self.init_with_m0 and self.optimize_branch_lengths: print("Initializing branch lengths with M0...") m0_optimizer = M0Optimizer( self.alignment, self.tree, use_f3x4=self.use_f3x4, optimize_branch_lengths=True ) _, _, m0_lnL = m0_optimizer.optimize() print(f"M0 initialization complete: lnL = {m0_lnL:.6f}\n") # Initial parameters if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa), np.log(init_p0 / (1 - init_p0)), # Logit transform np.log(init_p_beta), np.log(init_q_beta), np.log(init_omega_s)] + [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100)), # kappa (-10, 10), # p0 (logit) (np.log(0.005), np.log(99)), # p_beta (np.log(0.005), np.log(99)), # q_beta (np.log(1.001), np.log(20))] + # omega_s [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array([ np.log(init_kappa), np.log(init_p0 / (1 - init_p0)), np.log(init_p_beta), np.log(init_q_beta), np.log(init_omega_s), np.log(1.0) ]) bounds = [ (np.log(0.1), np.log(100)), (-10, 10), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(1.001), np.log(20)), (np.log(0.01), np.log(100)) ] print(f"Starting M8 optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, p0={init_p0:.4f}, p={init_p_beta:.4f}, " f"q={init_q_beta:.4f}, omega_s={init_omega_s:.4f}") print(f"Beta distribution discretized into {self.ncatG} categories + 1 positive selection class") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) opt_p0 = 1.0 / (1.0 + np.exp(-result.x[1])) opt_p_beta = np.exp(result.x[2]) opt_q_beta = np.exp(result.x[3]) opt_omega_s = np.exp(result.x[4]) opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, p0={opt_p0:.4f}, p={opt_p_beta:.4f}, " f"q={opt_q_beta:.4f}, omega_s={opt_omega_s:.4f}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_p0, opt_p_beta, opt_q_beta, opt_omega_s, opt_log_likelihood
[docs] class M8aOptimizer: """ Optimize parameters for M8a (beta & omega=1) codon model. This is the null model for the M8a vs M8 likelihood ratio test. The only difference from M8 is that omega_s is fixed to 1.0 (not optimized). Estimates: - kappa (transition/transversion ratio) - p0 (proportion in beta distribution) - p_beta (beta shape parameter 1) - q_beta (beta shape parameter 2) - branch lengths Note: omega_s is FIXED to 1.0 (neutral), NOT optimized. """
[docs] def __init__( self, alignment: Alignment, tree: Tree, ncatG: int = 10, use_f3x4: bool = True, optimize_branch_lengths: bool = True, init_with_m0: bool = True ): """ Initialize M8a optimizer. Parameters ---------- init_with_m0 : bool Initialize branch lengths by optimizing M0 first (default: True). """ self.alignment = alignment self.tree = tree self.ncatG = ncatG self.use_f3x4 = use_f3x4 self.optimize_branch_lengths = optimize_branch_lengths self.init_with_m0 = init_with_m0 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) self.branch_nodes = [node for node in tree.postorder() if node.parent is not None] self.n_branches = len(self.branch_nodes) self.history = []
[docs] def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" kappa = np.exp(params[0]) p0 = 1.0 / (1.0 + np.exp(-params[1])) # Sigmoid for [0,1] p_beta = np.exp(params[2]) q_beta = np.exp(params[3]) # Note: omega_s is FIXED to 1.0, not optimized # Update branch lengths if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[4 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[4]) # Create model with omega_s fixed to 1.0 model = M8aCodonModel( kappa=kappa, p0=p0, p_beta=p_beta, q_beta=q_beta, ncatG=self.ncatG, pi=self.pi ) # Compute likelihood try: Q_matrices = model.get_Q_matrices() proportions, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, proportions, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'p0': p0, 'p_beta': p_beta, 'q_beta': q_beta, 'omega_s': 1.0, # Fixed 'log_likelihood': log_likelihood }) return -log_likelihood
[docs] def optimize( self, init_kappa: float = 2.0, init_p0: float = 0.9, init_p_beta: float = 0.7, init_q_beta: float = 1.5, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float, float, float]: """ Optimize M8a parameters. Returns ------- tuple (kappa, p0, p_beta, q_beta, log_likelihood) Note: omega_s is always 1.0 (not returned as it's fixed) """ # Initialize branch lengths with M0 if requested if self.init_with_m0 and self.optimize_branch_lengths: print("Initializing branch lengths with M0...") m0_optimizer = M0Optimizer( self.alignment, self.tree, use_f3x4=self.use_f3x4, optimize_branch_lengths=True ) _, _, m0_lnL = m0_optimizer.optimize() print(f"M0 initialization complete: lnL = {m0_lnL:.6f}\n") # Initial parameters (NOTE: 4 params instead of 5, no omega_s) if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa), np.log(init_p0 / (1 - init_p0)), # Logit transform np.log(init_p_beta), np.log(init_q_beta)] + [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100)), # kappa (-10, 10), # p0 (logit) (np.log(0.005), np.log(99)), # p_beta (np.log(0.005), np.log(99))] + # q_beta [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array([ np.log(init_kappa), np.log(init_p0 / (1 - init_p0)), np.log(init_p_beta), np.log(init_q_beta), np.log(1.0) ]) bounds = [ (np.log(0.1), np.log(100)), (-10, 10), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(0.01), np.log(100)) ] print(f"Starting M8a optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, p0={init_p0:.4f}, p={init_p_beta:.4f}, " f"q={init_q_beta:.4f}, omega_s=1.0 (FIXED)") print(f"Beta distribution discretized into {self.ncatG} categories + 1 neutral class (ω=1)") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) opt_p0 = 1.0 / (1.0 + np.exp(-result.x[1])) opt_p_beta = np.exp(result.x[2]) opt_q_beta = np.exp(result.x[3]) opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, p0={opt_p0:.4f}, p={opt_p_beta:.4f}, " f"q={opt_q_beta:.4f}, omega_s=1.0 (FIXED)") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_p0, opt_p_beta, opt_q_beta, opt_log_likelihood
class M5Optimizer: """ Optimizer for M5 (gamma) codon model. Optimizes kappa, alpha, beta parameters and optionally branch lengths. """ def __init__( self, alignment: Alignment, tree: Tree, ncatG: int = 10, use_f3x4: bool = True, optimize_branch_lengths: bool = True ): """Initialize M5 optimizer.""" self.alignment = alignment self.tree = tree self.ncatG = ncatG self.optimize_branch_lengths = optimize_branch_lengths # Compute codon frequencies from ..models.codon import compute_codon_frequencies_f3x4 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Get branch nodes if optimizing branch lengths if optimize_branch_lengths: self.branch_nodes = [] for node in tree.traverse(): if node.branch_length is not None: self.branch_nodes.append(node) self.n_branches = len(self.branch_nodes) else: self.n_branches = 0 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" # Extract parameters (log-transformed) kappa = np.exp(params[0]) alpha = np.exp(params[1]) beta = np.exp(params[2]) # Update branch lengths if optimizing if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[3 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[3]) # Create model and compute likelihood from ..models.codon import M5CodonModel model = M5CodonModel(kappa=kappa, alpha=alpha, beta=beta, ncatG=self.ncatG, pi=self.pi) # Compute likelihood try: Q_matrices = model.get_Q_matrices() proportions, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, proportions, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'alpha': alpha, 'beta': beta, 'log_likelihood': log_likelihood }) return -log_likelihood def optimize( self, init_kappa: float = 2.0, init_alpha: float = 1.0, init_beta: float = 1.0, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float, float]: """Optimize M5 parameters.""" # Initial parameters if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa), np.log(init_alpha), np.log(init_beta)] + [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100)), # kappa (np.log(0.005), np.log(99)), # alpha (np.log(0.005), np.log(99))] + # beta [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array([ np.log(init_kappa), np.log(init_alpha), np.log(init_beta), np.log(1.0) ]) bounds = [ (np.log(0.1), np.log(100)), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(0.01), np.log(100)) ] print(f"Starting M5 optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, alpha={init_alpha:.4f}, beta={init_beta:.4f}") print(f"Gamma distribution discretized into {self.ncatG} categories") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) opt_alpha = np.exp(result.x[1]) opt_beta = np.exp(result.x[2]) opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, alpha={opt_alpha:.4f}, beta={opt_beta:.4f}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_alpha, opt_beta, opt_log_likelihood class M9Optimizer: """ Optimizer for M9 (beta & gamma) codon model. Optimizes kappa, p0, p_beta, q_beta, alpha, beta_gamma parameters and optionally branch lengths. """ def __init__( self, alignment: Alignment, tree: Tree, ncatG: int = 10, use_f3x4: bool = True, optimize_branch_lengths: bool = True ): """Initialize M9 optimizer.""" self.alignment = alignment self.tree = tree self.ncatG = ncatG self.optimize_branch_lengths = optimize_branch_lengths # Compute codon frequencies from ..models.codon import compute_codon_frequencies_f3x4 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Get branch nodes if optimizing branch lengths if optimize_branch_lengths: self.branch_nodes = [] for node in tree.traverse(): if node.branch_length is not None: self.branch_nodes.append(node) self.n_branches = len(self.branch_nodes) else: self.n_branches = 0 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" # Extract parameters kappa = np.exp(params[0]) p0 = 1.0 / (1.0 + np.exp(-params[1])) # Sigmoid for [0,1] p_beta = np.exp(params[2]) q_beta = np.exp(params[3]) alpha = np.exp(params[4]) beta_gamma = np.exp(params[5]) # Update branch lengths if optimizing if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[6 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[6]) # Create model and compute likelihood from ..models.codon import M9CodonModel model = M9CodonModel( kappa=kappa, p0=p0, p_beta=p_beta, q_beta=q_beta, alpha=alpha, beta_gamma=beta_gamma, ncatG=self.ncatG, pi=self.pi ) # Compute likelihood try: Q_matrices = model.get_Q_matrices() proportions, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, proportions, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'p0': p0, 'p_beta': p_beta, 'q_beta': q_beta, 'alpha': alpha, 'beta_gamma': beta_gamma, 'log_likelihood': log_likelihood }) return -log_likelihood def optimize( self, init_kappa: float = 2.0, init_p0: float = 0.5, init_p_beta: float = 0.5, init_q_beta: float = 0.5, init_alpha: float = 1.0, init_beta_gamma: float = 1.0, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float, float, float, float, float]: """Optimize M9 parameters.""" # Initial parameters if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa), np.log(init_p0 / (1 - init_p0)), # Logit transform np.log(init_p_beta), np.log(init_q_beta), np.log(init_alpha), np.log(init_beta_gamma)] + [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100)), # kappa (-10, 10), # p0 (logit) (np.log(0.005), np.log(99)), # p_beta (np.log(0.005), np.log(99)), # q_beta (np.log(0.005), np.log(99)), # alpha (np.log(0.005), np.log(99))] + # beta_gamma [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array([ np.log(init_kappa), np.log(init_p0 / (1 - init_p0)), np.log(init_p_beta), np.log(init_q_beta), np.log(init_alpha), np.log(init_beta_gamma), np.log(1.0) ]) bounds = [ (np.log(0.1), np.log(100)), (-10, 10), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(0.01), np.log(100)) ] print(f"Starting M9 optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, p0={init_p0:.4f}, p_beta={init_p_beta:.4f}, " f"q_beta={init_q_beta:.4f}, alpha={init_alpha:.4f}, beta={init_beta_gamma:.4f}") print(f"Beta distribution ({self.ncatG} categories) + Gamma distribution ({self.ncatG} categories)") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) opt_p0 = 1.0 / (1.0 + np.exp(-result.x[1])) opt_p_beta = np.exp(result.x[2]) opt_q_beta = np.exp(result.x[3]) opt_alpha = np.exp(result.x[4]) opt_beta_gamma = np.exp(result.x[5]) opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, p0={opt_p0:.4f}, p_beta={opt_p_beta:.4f}, " f"q_beta={opt_q_beta:.4f}, alpha={opt_alpha:.4f}, beta={opt_beta_gamma:.4f}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_p0, opt_p_beta, opt_q_beta, opt_alpha, opt_beta_gamma, opt_log_likelihood class M4Optimizer: """ Optimizer for M4 (freqs) codon model. Optimizes kappa and proportions (p0, p1, p2, p3) with p4 = 1 - sum. Omega values are fixed at {0, 1/3, 2/3, 1, 3}. """ def __init__( self, alignment: Alignment, tree: Tree, use_f3x4: bool = True, optimize_branch_lengths: bool = True ): """Initialize M4 optimizer.""" self.alignment = alignment self.tree = tree self.optimize_branch_lengths = optimize_branch_lengths # Compute codon frequencies from ..models.codon import compute_codon_frequencies_f3x4 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Get branch nodes if optimizing branch lengths if optimize_branch_lengths: self.branch_nodes = [] for node in tree.traverse(): if node.branch_length is not None: self.branch_nodes.append(node) self.n_branches = len(self.branch_nodes) else: self.n_branches = 0 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" # Extract parameters kappa = np.exp(params[0]) # Extract proportions using softmax transformation (ensures sum to 1) # We have 4 free parameters for 5 proportions logits = params[1:5] exp_logits = np.exp(logits - np.max(logits)) # Numerical stability proportions_raw = np.concatenate([exp_logits, [1.0]]) proportions = proportions_raw / proportions_raw.sum() # Update branch lengths if optimizing if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[5 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[5]) # Create model and compute likelihood from ..models.codon import M4CodonModel model = M4CodonModel(kappa=kappa, proportions=proportions.tolist(), pi=self.pi) # Compute likelihood try: Q_matrices = model.get_Q_matrices() props, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, props, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'proportions': proportions.tolist(), 'log_likelihood': log_likelihood }) return -log_likelihood def optimize( self, init_kappa: float = 2.0, init_proportions: list[float] = None, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, list[float], float]: """Optimize M4 parameters.""" # Default proportions (equal) if init_proportions is None: init_proportions = [0.2, 0.2, 0.2, 0.2, 0.2] else: if len(init_proportions) != 5: raise ValueError("M4 requires exactly 5 initial proportions") # Normalize init_proportions = np.array(init_proportions) init_proportions = init_proportions / init_proportions.sum() # Convert proportions to logits (inverse softmax for 4 free params) # We use first 4 proportions and compute 5th as 1 - sum logits = np.log(init_proportions[:4]) # Initial parameters if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa)] + logits.tolist() + [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100))] + # kappa [(-10, 10)] * 4 + # logits for proportions [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array( [np.log(init_kappa)] + logits.tolist() + [np.log(1.0)] ) bounds = ( [(np.log(0.1), np.log(100))] + # kappa [(-10, 10)] * 4 + # logits for proportions [(np.log(0.01), np.log(100))] # scale ) print(f"Starting M4 optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}") print(f"Initial proportions: {[f'{p:.4f}' for p in init_proportions]}") print(f"Fixed omegas: [0.0, 0.333, 0.667, 1.0, 3.0]") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) # Extract optimized proportions logits = result.x[1:5] exp_logits = np.exp(logits - np.max(logits)) proportions_raw = np.concatenate([exp_logits, [1.0]]) opt_proportions = (proportions_raw / proportions_raw.sum()).tolist() opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}") print(f"Final proportions: {[f'{p:.4f}' for p in opt_proportions]}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_proportions, opt_log_likelihood class M6Optimizer: """ Optimizer for M6 (2gamma) codon model. Optimizes kappa, p0, alpha1, beta1, alpha2 (where alpha2 = beta2). """ def __init__( self, alignment: Alignment, tree: Tree, ncatG: int = 10, use_f3x4: bool = True, optimize_branch_lengths: bool = True ): """Initialize M6 optimizer.""" self.alignment = alignment self.tree = tree self.ncatG = ncatG self.optimize_branch_lengths = optimize_branch_lengths # Compute codon frequencies from ..models.codon import compute_codon_frequencies_f3x4 if use_f3x4: self.pi = compute_codon_frequencies_f3x4(alignment) else: self.pi = np.ones(61) / 61 # Get branch nodes if optimizing branch lengths if optimize_branch_lengths: self.branch_nodes = [] for node in tree.traverse(): if node.branch_length is not None: self.branch_nodes.append(node) self.n_branches = len(self.branch_nodes) else: self.n_branches = 0 # Create Rust likelihood calculator self.calc = RustLikelihoodCalculator(alignment, tree) def compute_log_likelihood(self, params: np.ndarray) -> float: """Compute negative log-likelihood for optimization.""" # Extract parameters (log-transformed) kappa = np.exp(params[0]) p0 = 1.0 / (1.0 + np.exp(-params[1])) # Sigmoid for [0,1] alpha1 = np.exp(params[2]) beta1 = np.exp(params[3]) alpha2 = np.exp(params[4]) # alpha2 = beta2 # Update branch lengths if optimizing if self.optimize_branch_lengths: for i, node in enumerate(self.branch_nodes): node.branch_length = np.exp(params[5 + i]) branch_scale = 1.0 else: branch_scale = np.exp(params[5]) # Create model and compute likelihood from ..models.codon import M6CodonModel model = M6CodonModel( kappa=kappa, p0=p0, alpha1=alpha1, beta1=beta1, alpha2=alpha2, ncatG=self.ncatG, pi=self.pi ) # Compute likelihood try: Q_matrices = model.get_Q_matrices() proportions, _ = model.get_site_classes() log_likelihood = self.calc.compute_log_likelihood_site_classes( Q_matrices, self.pi, proportions, scale_branch_lengths=branch_scale ) except Exception as e: print(f"Error computing likelihood: {e}") return 1e10 self.history.append({ 'kappa': kappa, 'p0': p0, 'alpha1': alpha1, 'beta1': beta1, 'alpha2': alpha2, 'log_likelihood': log_likelihood }) return -log_likelihood def optimize( self, init_kappa: float = 2.0, init_p0: float = 0.5, init_alpha1: float = 1.0, init_beta1: float = 1.0, init_alpha2: float = 1.0, method: str = 'L-BFGS-B', maxiter: int = 200 ) -> Tuple[float, float, float, float, float, float]: """Optimize M6 parameters.""" # Initial parameters if self.optimize_branch_lengths: init_branch_lengths = [node.branch_length for node in self.branch_nodes] init_params = np.array( [np.log(init_kappa), np.log(init_p0 / (1 - init_p0)), # Logit transform np.log(init_alpha1), np.log(init_beta1), np.log(init_alpha2)] + [np.log(max(bl, 0.001)) for bl in init_branch_lengths] ) bounds = ( [(np.log(0.1), np.log(100)), # kappa (-10, 10), # p0 (logit) (np.log(0.005), np.log(99)), # alpha1 (np.log(0.005), np.log(99)), # beta1 (np.log(0.005), np.log(99))] + # alpha2 [(np.log(0.0001), np.log(50))] * self.n_branches ) else: init_params = np.array([ np.log(init_kappa), np.log(init_p0 / (1 - init_p0)), np.log(init_alpha1), np.log(init_beta1), np.log(init_alpha2), np.log(1.0) ]) bounds = [ (np.log(0.1), np.log(100)), (-10, 10), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(0.005), np.log(99)), (np.log(0.01), np.log(100)) ] print(f"Starting M6 optimization with method={method}, maxiter={maxiter}") print(f"Initial: kappa={init_kappa:.4f}, p0={init_p0:.4f}, alpha1={init_alpha1:.4f}, " f"beta1={init_beta1:.4f}, alpha2={init_alpha2:.4f}") print(f"Mixture of 2 gamma distributions discretized into {self.ncatG} categories") self.history = [] result = minimize( self.compute_log_likelihood, init_params, method=method, bounds=bounds, options={'maxiter': maxiter} ) opt_kappa = np.exp(result.x[0]) opt_p0 = 1.0 / (1.0 + np.exp(-result.x[1])) opt_alpha1 = np.exp(result.x[2]) opt_beta1 = np.exp(result.x[3]) opt_alpha2 = np.exp(result.x[4]) opt_log_likelihood = -result.fun print(f"\nOptimization complete!") print(f"Final: kappa={opt_kappa:.4f}, p0={opt_p0:.4f}, alpha1={opt_alpha1:.4f}, " f"beta1={opt_beta1:.4f}, alpha2={opt_alpha2:.4f}") print(f"Log-likelihood: {opt_log_likelihood:.6f}") print(f"Iterations: {len(self.history)}") return opt_kappa, opt_p0, opt_alpha1, opt_beta1, opt_alpha2, opt_log_likelihood