Skip to content

Commit

Permalink
Various feature improvements and bugfixes
Browse files Browse the repository at this point in the history
 - Improve function variant edit distance calculation for allele selection
 - Improve deletion calling by implementing exon identification
 - Bugfixes for special case 3DP1 deletion variants
 - Add and expand test suite
  • Loading branch information
michael-ford committed Mar 29, 2024
1 parent ef3cfda commit de771d2
Show file tree
Hide file tree
Showing 16 changed files with 1,131 additions and 37 deletions.
431 changes: 431 additions & 0 deletions src/kir_annotator/Untitled.ipynb

Large diffs are not rendered by default.

188 changes: 179 additions & 9 deletions src/kir_annotator/alignment_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ def get_cigar(alignment):
cigar_list = re.split(r'([A-Z=])', alignment.cigar.decode.decode())[:-1]
return [(int(cigar_list[i]), cigar_list[i+1]) for i in range(0, len(cigar_list), 2)]

def perform_alignment(seq1, seq2):
parasail_matrix = parasail.matrix_create("ACGT", 2, -8)
gap_open_penalty = 12
gap_extend_penalty = 2
return parasail.sg_qx_trace_scan_32(seq1, seq2, gap_open_penalty, gap_extend_penalty, parasail_matrix)


def align_to_wildtype(assembly_seq: str, wildtype_seq: str, seq_is_reversed: bool) -> Tuple[str, List[Tuple[int, str]]]:
"""
Expand All @@ -28,11 +34,6 @@ def align_to_wildtype(assembly_seq: str, wildtype_seq: str, seq_is_reversed: boo
if not assembly_seq or not wildtype_seq:
raise ValueError("Empty sequence")

def perform_alignment(seq1, seq2):
parasail_matrix = parasail.matrix_create("ACGT", 2, -8)
gap_open_penalty = 12
gap_extend_penalty = 2
return parasail.sg_qx_trace_scan_32(seq1, seq2, gap_open_penalty, gap_extend_penalty, parasail_matrix)

if seq_is_reversed:
assembly_seq = mappy.revcomp(assembly_seq)
Expand Down Expand Up @@ -88,11 +89,18 @@ def identify_functional_variants(variants: List[Tuple[int, str]], wildtype: Alle
func_variants = []
for pos, op in variants:
region, relative_pos = wildtype.region(pos)
if region and region.is_exon and not region.pseudo:
if op.startswith("ins") or op.startswith("del"):
if op.startswith("del"):
func_variants.extend(identify_exon_deletions(pos, op, wildtype))
elif region and region.is_exon:
if op.startswith("ins"):
func_variants.append((pos, op))
else:
assert wildtype.seq[pos] == op[0]
assert wildtype.seq[pos] == op[0], f"Reference mismatch at position {pos} for variant {op} on wildtype {wildtype.gene.name+'*'+wildtype.name}"

if wildtype.gene.name == 'KIR3DP1' and pos == 2114 and op == 'C>T':
func_variants.append((pos, op))
continue

seq = list(region.seq)
seq[relative_pos] = op[2] # apply SNP
prot = "".join(
Expand All @@ -109,8 +117,130 @@ def identify_functional_variants(variants: List[Tuple[int, str]], wildtype: Alle
logging.warning(f"Error translating sequence for variant {pos} {op} on wildtype {wildtype.gene.name+'*'+wildtype.name}: {str(warning.message)}")
if prot != wildtype.protein:
func_variants.append((pos, op))

return func_variants

def identify_exon_deletions(pos: int, op: str, wildtype: Allele) -> List[Tuple[int, str]]:
"""
Identifies exon deletions based on a mutation operation.
This function calculates the actual deletion impact on exons considering the operation
start position and length. It handles full and partial deletions within exons while considering
whether the exon is functional (non-pseudo).
Args:
pos (int): The starting position of the deletion.
op (str): The deletion operation string, indicating the sequence being deleted.
wildtype (Allele): The wildtype allele object with exon regions information.
Returns:
List[Tuple[int, str]]: A list of tuples where each tuple represents the start position
and the deletion string for each affected exon.
"""
end_pos = pos + len(op[3:]) - 1 # Adjust based on the actual format of 'op'

custom_deletions = []
if wildtype.gene.name in ['KIR3DP1', 'KIR3DL3']:
custom_deletions = custom_deletions_fix(pos, op, wildtype)
if custom_deletions:
return custom_deletions

# Get all affected regions by the deletion
affected_regions = identify_deletion_regions(pos, op, wildtype)

op_seq = op[3:]
exon_deletions = [] # Store deletions that occur within exons
for region in affected_regions:
if region.is_exon:
if len(region.seq) == len(op_seq) and region.seq != op_seq:
offset = pos - region.start
if offset < 0:
if region.seq == op_seq[-offset:]+op_seq[:-offset]:
exon_deletions.append((region.start, f"del{region.seq}"))
break
elif offset > 0:
if region.seq == op_seq[-offset:]+op_seq[:-offset]:
exon_deletions.append((region.start, f"del{region.seq}"))
break

# Calculate overlap of the deletion with the exon
overlap_start = max(pos, region.start)
overlap_end = min(end_pos, region.end)
overlap_seq = region.seq[overlap_start - region.start:overlap_end - region.start + 1]

if overlap_seq: # Check if there is an overlapping sequence
exon_deletions.append((overlap_start, f"del{overlap_seq}"))

return exon_deletions + custom_deletions

def custom_deletions_fix(pos: int, op: str, wildtype: Allele) -> List[Tuple[int, str]]:
custom_deletions_3DP1 = {0:
(335,
'delGGGGATGGAGATCTGGGCCCAGAGGTGGAGATATAGGCCTGGAGGTGGAGTTATGGGCCTGGAGTGGAGATCTGGGCCTGGAGTGGATATATGGGCCTGGAGATGGAGTGATGGGCCTAGAAGTGGAGATCTGGGTCTGGAGTGGAGATATGGGCCTGGAGGTGGAGATATGGGCCTGGAGTGGAGATCTGGGCCTGGAGTGGAGATAGGAACCTGGAGGGGAGATATGAGCCTGGAGTGAAGATATTGGCCTGGGATGGAGATATGGGCCTGGAGTGGAGACATGGGCCTGGAGGTGGAGATATGGGCCTGGAGGTGGAGACATGGGCCTAGAGGTGGATATCTGGGCCTGGAGTGGACATATGGGCCTAGGATGGAGATATGGGCCTGGGTGTGGAGATATGGGCTTGGGGTGGAGATATGGGCCTGGATTGGAGATATGGGTCTAGGGTGGAAATATTGGCCTGGAGTGGAGATATGGGCCTGGAGTGGAGATATGGGCTTGGGGTGGGGATAGGGGCCTGGGGTGCGGATATGGGCCTGCAGGCTGGGTCTCTACACAGCCGACAGCCCTGTTCTTGGGTGCAGGCTGGCACTGAGGGTGAGTTTCCCTTCAGCCCAGCAAGGGCCTGGCTACCAAGACTCACAGCCCAGTGGGGGCAGCAAGGGAGTCCTGGTTTGCCTGCAGATGGATGGTCCATCATGATCTTTCTTTCCAGGGTTCTTCTTGCTGCAGGGGGCCTGGACACATGAGGGTGAGTCCTTCTCCAAACCTTCGGGTGTCATCTCCCCACATAAGAGGATTTTCCTGAAACAGGAGGGAAGCCCGGTGGGGGATTTTCTTATAAACAAGGATGAGGAGACCCTGGGGTGCTCAGCCCACAGTTCCGACCTTGCCCTCCCCAGCCTTCCTTTCCCTTGGCTGAGTCAGGTTCTGTGGGAACCCGGGAGGGTAGACTGGGGTCCTCCAAGCTGGGCTGTGCGGCTGGGATGTGGTGTCACTGGCAGAGGAAGGGAGCAAAGCAGTGCTAGGAACAGCAGGCCTCTGAGGACAAAGGTGTAACTCACACCCTCCAGCGTTTCCATGACGGTAGGGGCTGCAGTGTGGCTGCTGTCATTCTACCTCAGAGGTGGGGGAACCCCAGCCAGGGCCCTGACCTTCCAAATCCTCTGTTGGGGGCTCAGTTGTGTATTGTGGTTCACACATTGGCTGATATTCCATTCACAAAGAACATGCCCTCGACTCCATGTCTATTTGTGTTGTTTTATGTGAGTAATCTTGCAGGATTAAAATCTAGTAGGAGTCCCTTACTCAGCACTTGCTCAAAGTTCTCAGCTGACACTTTTGTTGTAGAGAGACGCCAAGTCTATGCGGGGTGGGTCCTTCCTGTAGCCCTGGGCACCCAGGTGTGGTAGGAGCCTTAGAAAGTGGAAATGGGAGAATCTTCTGACACGTGGAGGGAGGGGCGGCTC')}
custom_deletions_3DL3 = {(0,
'delGTATGAGAGATTGGATCTGAGACGTGTTTTGAGTTGGTTATAGTGAAGGATGCAAGGTGTCAATTCTAGTTGGAACAATTTCCAGGAAGCCATGTTCTGCTCTTGACCAAACAGCCACTGGGCCTCATGCAAGGTAGAAATAGCCTGCATACGTCATCCTCCCATGATGTGGTCAGCATGTAAACTGCATGAGCCCCTCACAACATCCTGTGTGCTGCTGAACTGAGCTGGGGCGCAGCCGCCTGTCTGCACCGGCAGCACCATGTCGCTCATGGTCGTCAGCATGGCGTGTGTTGGTGAGTCCTGGAAGGGAATCGAGGGAGGGAGCGGTGGGGTGGAGATCTGGGCCTGGAGTGGAGATATGGGCCTGGAGTGGAGATATGGGCCTGGAGTGGAGATATAGGCCTGGAGTGGAGATATGGGCCTGGGGTGGAGATATGGGCCTGGAGTGGAGATATGGGCCTGGAACTGTAGATATGGGCCTGAAGTAGAGATATGGGCCTGGAGTAGAGATATGGGCCTGGAACTGTAGATATGGGCCTGGAGTGGAGATATTGGCTTGGAGTGCAGATATGGACCTGGAATTGAGATACGGGCCTGGAGGTGGAGATATGGGCCTAGAGTGGAGATATGGGCCTGGAGGTGGAGATATGGGCCTGGAACTGTAGATATGGGCCTGGAGTAGAGATATGGGCCTGGAGTGGAGATGTTGGCTTGGAGTGCAGATATGGGCCTGGAATGGAGACACGGGCCTGGAGGTGGAGATACAGGCCTGGAGGTGGAGATATGGGCCTGGAGTGTAGATATGGGCCTGGAGTAGAGATATAGGACAGAGGTGGAGATATAGGCCTGGAGTGGAGATATGGGCCTGGAGTAGAGATATAG'):
(262, 'insTCAT')}
if wildtype.gene.name == 'KIR3DP1':
if pos == 0 and len(op[3:]) >= 1473:
return [custom_deletions_3DP1[pos]]
if wildtype.gene.name == 'KIR3DL3':
if (pos, op) in custom_deletions_3DL3:
return [custom_deletions_3DL3[(pos, op)]]
return []

def identify_deletion_regions(pos: int, op: str, wildtype: Allele) -> bool:
"""
Determines if a deletion affects functional (non-pseudo) exon regions within an allele.
Args:
pos (int): The starting position of the deletion.
op (str): The deletion operation string, indicating the sequence being deleted.
wildtype (Allele): The wildtype allele object, which should have a method 'region' to get region info.
Returns:
bool: True if the deletion affects any functional exon regions, False otherwise.
"""
start_region, _ = wildtype.region(pos)
end_region, _ = wildtype.region(pos + len(op[3:]) - 1)

# If the start and end regions are the same and it's not a functional exon, no impact
if start_region.name == end_region.name:
if start_region.is_exon:
return [start_region]
else:
return []

# Check if any region affected by the deletion is a functional exon
affected_regions = []
within_affected = False
for region in wildtype.regions.values():
# Start adding regions once the start_region is encountered
if region.name == start_region.name:
within_affected = True
if within_affected:
affected_regions.append(region)
if region.name == end_region.name:
break # Stop once the end_region is passed

return affected_regions

def identify_functional_deletion(pos: int, op: str, wildtype: Allele) -> bool:
"""
Determines if a deletion affects functional (non-pseudo) exon regions within an allele.
Args:
pos (int): The starting position of the deletion.
wildtype (Allele): The wildtype allele object, which should have a method 'region' to get region info.
Returns:
bool: True if the deletion affects any functional exon regions, False otherwise.
"""
affected_regions = identify_deletion_regions(pos, op, wildtype)
if any([region.is_exon and not region.pseudo for region in affected_regions]):
return True
return False



def identify_closest_allele(variants: List[Tuple[int, str]], functional_variants: List[Tuple[int, str]], gene: Gene) -> Dict:
"""
Expand Down Expand Up @@ -146,10 +276,50 @@ def get_allele_functional_variants(allele: Allele) -> set:
"missing mut": allele.mutations - set(variants),
"missing functional mut": allele_func_variants - functional_variants,
"new functional mut": functional_variants - allele_func_variants,
"jaccard functional distance": jaccard_distance(allele_func_variants, functional_variants),
"jaccard functional distance": normalized_jaccard_distance(allele_func_variants, gene, functional_variants),
"jaccard distance": jaccard_distance(allele.mutations, set(variants)),
}
allele_similarity.append(similarity_metrics)

return sorted(allele_similarity, key=lambda x: (x["jaccard functional distance"], x['jaccard distance']))

def normalized_jaccard_distance(allele_variants, gene, functional_variants: set) -> float:
"""
Calculates the normalized Jaccard index between the called functional variants and the allele's variants,
considering the gene's functional variants as the universe of possible variants.
Args:
allele: The allele object, expected to have a 'mutations' attribute.
gene: The gene object, expected to have a 'functional' attribute which is a set of functional variant positions.
functional_variants (set): A set of positions of functional variants called.
Returns:
float: The normalized Jaccard index.
"""
# Extract called variant positions and allele variants within the gene's functional set
called_var_pos = functional_variants

# Calculate intersection: variants agree between allele and call, or both match the wildtype
intersection = {var for var in gene.functional if (var in allele_variants) == (var in called_var_pos)}

# Calculate union: all unique variants from gene's functional variants and called functional variants
union = set(gene.functional.keys()) | called_var_pos

# Return normalized Jaccard index: size of intersection divided by size of union
return 1-(sum([calculate_variant_size(op) for p, op in intersection]) / sum([calculate_variant_size(op) for p, op in union])) if union else 1.0 # Return 1.0 if union is empty, meaning perfect match/no variation


def calculate_variant_size(op):
"""
Calculates the size of a variant operation.
Args:
op (str): The variant operation string.
Returns:
int: The size of the variant operation.
"""
if op.startswith("ins") or op.startswith("del"):
return len(op[3:])
else: # SNV
return 1
47 changes: 46 additions & 1 deletion src/kir_annotator/annotation_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .alignment_variants import align_to_wildtype, call_variants
from .io_utils import load_fixes
from typing import Tuple, List, Dict, Optional
from typing import Tuple, List, Dict, Optional, Any
from .common import Gene
import mappy, logging
from collections import OrderedDict

logger = logging.getLogger("kir-annotator")

Expand Down Expand Up @@ -193,3 +194,47 @@ def calculate_new_positions(start: int, end: int, adjust_start: int, adjust_end:
new_end = end - adjust_start
return new_start, new_end


def filter_annotations(annotations: List[Tuple[OrderedDict, Any]], database, min_cov=0.5) -> List[Tuple[OrderedDict, Any]]:
"""
Filters out annotations that are wholly contained within another annotation.
Args:
annotations (List[Tuple[OrderedDict, Any]]): A list of annotation tuples.
Returns:
List[Tuple[OrderedDict, Any]]: The filtered list of annotation tuples.
"""
# Create a new list for the filtered annotations
filtered_annotations = []

# Sort the annotations based on the 'start' position to simplify containment checks
sorted_annotations = sorted(annotations, key=lambda x: x[0]['start'])

# Keep track of intervals to identify containment
intervals = []

incomplete_gene_copies = []
for annotation in sorted_annotations:
current_start = annotation[0]['start']
current_end = annotation[0]['end']
is_contained = False

# Check if current annotation is contained within any existing interval
for (start, end) in intervals:
if start <= current_start and current_end <= end:
is_contained = True
logger.info(f"Annotation {annotation[0]['gene']} at {current_start}-{current_end} is contained within another annotation and will be removed.")
break # No need to check other intervals

gene_coverage = (current_end-current_start) / float(len(database[annotation[0]['gene']].wildtype.seq))

# If the annotation is not contained within any other, add it to the filtered list and update the intervals
if not is_contained and gene_coverage >= min_cov:
filtered_annotations.append(annotation)
intervals.append((current_start, current_end))
elif gene_coverage < min_cov:
annotation[0]['coverage'] = gene_coverage
incomplete_gene_copies.append(annotation[0])

return filtered_annotations, incomplete_gene_copies
12 changes: 9 additions & 3 deletions src/kir_annotator/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
log_format = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'

# Configure your logger
logging.basicConfig(level=logging.DEBUG, format=log_format)
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger("kir-annotator")


Expand Down Expand Up @@ -41,14 +41,16 @@ def get_data_file_path(filename: str) -> Optional[str]:
if os.path.exists(data_file_path):
return data_file_path
else:
print(f"Error: The file {filename} does not exist.")
logger.error(f"Error: The file {filename} does not exist.")
return None


@dataclass
class Region:
gene: str
name: str
start: int
end: int
seq: str
partial: bool = False
pseudo: bool = False
Expand All @@ -73,6 +75,7 @@ class Allele:
func: set
minor: set
mutations: set
keystones: set

def __init__(self, gene, name, record):
"""Initialize the allele from an IMGT (kir.dat) record"""
Expand All @@ -99,6 +102,8 @@ def __init__(self, gene, name, record):
self.regions[name] = Region(
gene,
name,
int(f.location.start),
int(f.location.end),
str(record.seq[int(f.location.start) : int(f.location.end)]),
"partial" in f.qualifiers,
"pseudo" in f.qualifiers,
Expand All @@ -108,6 +113,7 @@ def __init__(self, gene, name, record):
self.func = set()
self.minor = set()
self.mutations = set()
self.keystones = set()

def parse_mutations(self):
"""Find all core mutations that modify the protein."""
Expand Down Expand Up @@ -353,7 +359,7 @@ def fasta_from_seq(name, seq):
for n, s in zip(name, seq):
result.append('>{}\n{}'.format(n, s))
except TypeError:
log.error('Please provide a iterable or string')
logger.error('Please provide a iterable or string')
raise TypeError
else:
result = ['>{}\n{}'.format(name, seq)]
Expand Down
Binary file modified src/kir_annotator/data/kir_db.pickle
Binary file not shown.
Loading

0 comments on commit de771d2

Please sign in to comment.