Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mehod to go from SNP-level (SNPObject) to window-level ancestry information (LocalAncestryObject) #17

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions snputils/ancestry/genobj/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import numpy as np
import copy
import warnings
from typing import Union, List, Dict, Sequence, Optional
from typing import Union, List, Dict, Sequence, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from snputils.snp.genobj.snpobj import SNPObject
salcc marked this conversation as resolved.
Show resolved Hide resolved

from .base import AncestryObject
from snputils.snp.genobj.snpobj import SNPObject

log = logging.getLogger(__name__)

Expand All @@ -17,25 +19,25 @@ class LocalAncestryObject(AncestryObject):
"""
def __init__(
self,
haplotypes: List[str],
haplotypes: List[str],
lai: np.ndarray,
samples: Optional[List[str]] = None,
ancestry_map: Optional[Dict[str, str]] = None,
samples: Optional[List[str]] = None,
ancestry_map: Optional[Dict[str, str]] = None,
window_sizes: Optional[np.ndarray] = None,
centimorgan_pos: Optional[np.ndarray] = None,
chromosomes: Optional[np.ndarray] = None,
physical_pos: Optional[np.ndarray] = None
) -> None:
"""
Args:
haplotypes (list of str of length n_haplotypes):
haplotypes (list of str of length n_haplotypes):
A list of unique haplotype identifiers.
lai (array of shape (n_windows, n_haplotypes)):
A 2D array containing local ancestry inference values, where each row represents a
genomic window, and each column corresponds to a haplotype phase for each sample.
samples (list of str of length n_samples, optional):
samples (list of str of length n_samples, optional):
A list of unique sample identifiers.
ancestry_map (dict of str to str, optional):
ancestry_map (dict of str to str, optional):
A dictionary mapping ancestry codes to region names.
window_sizes (array of shape (n_windows,), optional):
An array specifying the number of SNPs in each genomic window.
Expand Down Expand Up @@ -508,7 +510,7 @@ def convert_to_snp_level(
) -> 'SNPObject':
"""
Convert `self` into a `snputils.snp.genobj.SNPObject` SNP-level Local Ancestry Information (LAI),
with optional support for Single Nucleotide Polymorphism (SNP) data.
with optional support for SNP data.

If SNP positions (`variants_pos`) and chromosomes (`variants_chrom`) are not specified, the method generates
SNPs uniformly across the start and end positions of each genomic window. Otherwise, the provided SNP
Expand All @@ -535,9 +537,11 @@ def convert_to_snp_level(
An array containing the Phred-scaled quality score for each SNP.

Returns:
SNPObject:
**SNPObject**:
A `SNPObject` containing SNP-level ancestry data, along with optional metadata.
"""
from snputils.snp.genobj.snpobj import SNPObject

# Extract attributes from SNPObject if provided
if snpobject is not None:
variants_chrom = variants_chrom if variants_chrom is not None else snpobject.variants_chrom
Expand Down
158 changes: 155 additions & 3 deletions snputils/snp/genobj/snpobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import copy
import warnings
import re
from typing import Any, Union, Tuple, List, Sequence, Dict, Optional
from typing import Any, Union, Tuple, List, Sequence, Dict, Optional, TYPE_CHECKING
from scipy.stats import mode

if TYPE_CHECKING:
from snputils.ancestry.genobj.local import LocalAncestryObject

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,8 +54,9 @@ def __init__(
An array containing the chromosomal positions for each SNP.
variants_qual (array of shape (n_snps,), optional):
An array containing the Phred-scaled quality score for each SNP.
calldata_lai (array of shape (n_snps, n_samples, 2)):
An array containing the ancestry for each SNP.
calldata_lai (array, optional):
An array containing the ancestry for each SNP. This array can be either 2D with shape
`(n_snps, n_samples*2)`, or 3D with shape (n_snps, n_samples, 2).
ancestry_map (dict of str to str, optional):
A dictionary mapping ancestry codes to region names.
"""
Expand Down Expand Up @@ -1364,6 +1369,153 @@ def set_empty_to_missing(self, inplace: bool = False) -> Optional['SNPObject']:
snpobj.variants_id[snpobj.variants_id == ''] = '.'
return snpobj

def convert_to_window_level(
self,
window_size: Optional[int] = None,
physical_pos: Optional[np.ndarray] = None,
chromosomes: Optional[np.ndarray] = None,
window_sizes: Optional[np.ndarray] = None,
laiobj: Optional['LocalAncestryObject'] = None
) -> 'LocalAncestryObject':
"""
Aggregate the `calldata_lai` attribute into genomic windows within a
`snputils.ancestry.genobj.LocalAncestryObject`.

**Options for defining windows (in order of precedence):**

1. **Fixed window size**:
- Use `window_size` to specify how many SNPs go into each window. The last window on each
chromosome may be larger if SNPs are not evenly divisible by the size.

2. **Custom start and end positions**:
- Provide `physical_pos` (2D array of shape (n_windows, 2)) as the [start, end] base-pair
coordinates for each window.
- If `chromosomes` is not provided and `self` has exactly one chromosome, all windows are
assumed to belong to that chromosome.
- If multiple chromosomes exist but `chromosomes` is missing, an error will be raised.
- Optionally, provide `window_sizes` to store the SNP count per-window.

3. **Matching existing windows**:
- Reuse window definitions (`physical_pos`, `chromosomes`, `window_sizes`) from an existing `laiobj`.

Args:
window_size (int, optional):
Number of SNPs in each window if defining fixed-size windows. If the total number of
SNPs in a chromosome is not evenly divisible by the window size, the last window on that
chromosome will include all remaining SNPs and therefore be larger than the specified size.
physical_pos (array of shape (n_windows, 2), optional):
A 2D array containing the start and end physical positions for each window.
chromosomes (array of shape (n_windows,), optional):
An array with chromosome numbers corresponding to each genomic window.
window_sizes (array of shape (n_windows,), optional):
An array specifying the number of SNPs in each genomic window.
laiobj (LocalAncestryObject, optional):
A reference `LocalAncestryObject` from which to copy existing window definitions.

Returns:
**LocalAncestryObject:**
A LocalAncestryObject containing window-level ancestry data.
"""
from snputils.ancestry.genobj.local import LocalAncestryObject

if window_size is None and physical_pos is None and laiobj is None:
raise ValueError("One of `window_size`, `physical_pos`, or `laiobj` must be provided.")

# 1. Fixed window size
if window_size is not None:
physical_pos = [] # Boundaries [start, end] of each window
chromosomes = [] # Chromosome for each window
window_sizes = [] # Number of SNPs for each window
for chrom in self.unique_chrom:
# Extract indices corresponding to this chromosome
mask_chrom = (self.variants_chrom == chrom)
# Subset to this chromosome
pos_chrom = self.variants_pos[mask_chrom]
# Number of SNPs for this chromosome
n_snps_chrom = pos_chrom.size

# Initialize the start of the first window with the position of the first SNP
current_start = self.variants_pos[0]

# Number of full windows with exactly `window_size` SNPs
n_full_windows = n_snps_chrom // window_size

# Build all but the last window
for i in range(n_full_windows-1):
current_end = self.variants_pos[(i+1) * window_size - 1]
physical_pos.append([current_start, current_end])
chromosomes.append(chrom)
window_sizes.append(window_size)
current_start = self.variants_pos[(i+1) * window_size]

# Build the last window
current_end = self.variants_pos[-1]
physical_pos.append([current_start, current_end])
chromosomes.append(chrom)
window_sizes.append(n_snps_chrom - ((n_full_windows - 1) * window_size))

physical_pos = np.array(physical_pos)
chromosomes = np.array(chromosomes)
window_sizes = np.array(window_sizes)

# 2. Custom start and end positions
elif physical_pos is not None:
# Check if there is exactly one chromosome
if chromosomes is None:
unique_chrom = self.unique_chrom
if len(unique_chrom) == 1:
# We assume all windows belong to this single chromosome
single_chrom = unique_chrom[0]
chromosomes = np.array([single_chrom] * physical_pos.shape[0])
else:
raise ValueError("Multiple chromosomes detected, but `chromosomes` was not provided.")

# 3. Match existing windows to a reference laiobj
elif laiobj is not None:
physical_pos = laiobj.physical_pos
chromosomes = laiobj.chromosomes
window_sizes = laiobj.window_sizes

# Allocate an output LAI array
n_windows = physical_pos.shape[0]
n_samples = self.n_samples
if len(self.calldata_lai.shape) == 3:
lai = np.zeros((n_windows, n_samples, 2))
else:
lai = np.zeros((n_windows, n_samples*2))

# For each window, find the relevant SNPs and compute the mode of the ancestries
for i, ((start, end), chrom) in enumerate(zip(physical_pos, chromosomes)):
snps_mask = (
(self.variants_chrom == chrom) &
(self.variants_pos >= start) &
(self.variants_pos <= end)
)
if np.any(snps_mask):
lai_mask = self.calldata_lai[snps_mask, ...]
mode_ancestries = mode(lai_mask, axis=0, nan_policy='omit').mode
lai[i] = mode_ancestries
else:
lai[i] = np.nan

# Generate haplotype labels, e.g. "Sample1.0", "Sample1.1"
haplotypes = [f"{sample}.{i}" for sample in self.samples for i in range(2)]

# If original data was (n_snps, n_samples, 2), flatten to (n_windows, n_samples*2)
if len(self.calldata_lai.shape) == 3:
lai = lai.reshape(n_windows, -1)

# Aggregate into a LocalAncestryObject
return LocalAncestryObject(
haplotypes=haplotypes,
lai=lai,
samples=self.samples,
ancestry_map=self.ancestry_map,
window_sizes=window_sizes,
physical_pos=physical_pos,
chromosomes=chromosomes
)

def save(self, file: Union[str, Path]) -> None:
"""
Save the data stored in `self` to a specified file.
Expand Down
Loading