-
Notifications
You must be signed in to change notification settings - Fork 0
/
GenBankParser.py
123 lines (105 loc) · 4.11 KB
/
GenBankParser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import re
from Bio import SeqIO
import pyranges as pr
import pandas as pd
from Logger import Logger
from Bio.SeqFeature import CompoundLocation
from functools import lru_cache
class GenBankReader:
def __init__(self, filename):
self.filename = filename
@property
@lru_cache(maxsize=None)
def records(self):
with open(self.filename, "r") as handle:
return SeqIO.to_dict(SeqIO.parse(handle, "genbank"))
class GenBankParser(Logger):
def __init__(self, filename):
super().__init__()
self.reader = GenBankReader(filename)
self.records = self.reader.records
# self.pam_finder = PAMFinder(self.records)
self.info(f"Found the following records:")
self.json(self.organisms)
@property
@lru_cache(maxsize=None)
def organisms(self):
return {
id: record.annotations.get("organism", None)
for id, record in self.records.items()
}
@property
@lru_cache(maxsize=None)
def seq_lens(self):
return {id: len(record.seq) for id, record in self.records.items()}
@property
@lru_cache(maxsize=None)
def topologies(self):
return {
id: record.annotations.get("topology", None)
for id, record in self.records.items()
}
@property
@lru_cache(maxsize=None)
def num_genes(self):
return {
id: len([f for f in record.features if f.type == "gene"])
for id, record in self.records.items()
}
@property
@lru_cache(maxsize=None)
def overhangs(self):
return {
id: (100_000 if self.topologies[id] == "circular" else 0)
for id, _ in self.records.items()
}
@property
@lru_cache(maxsize=None)
def ranges(self):
data = []
for id, record in self.records.items():
for feature in record.features:
if feature.type not in ["source", "gene"]:
continue
# Check if the feature location is a CompoundLocation
if isinstance(feature.location, CompoundLocation):
# If so, use the parts of the location
locations = feature.location.parts
else:
# If not, use the location itself
locations = [feature.location]
for location in locations:
start = int(location.start)
end = int(location.end)
strand = location.strand
interval_dict = {
"Chromosome": id,
"Start": start,
"End": end,
"Strand": "+" if strand == 1 else "-" if strand == -1 else ".",
"Locus_Tag": feature.qualifiers.get("locus_tag", [None])[0],
"Gene": feature.qualifiers.get("gene", [None])[0],
"Type": feature.type,
}
data.append(interval_dict)
df = pd.DataFrame(data)
ranges = pr.PyRanges(df)
return ranges
def make_fasta(self, filename):
# Write the records to a FASTA file
with open(filename, "w") as fasta_file:
SeqIO.write(self.records.values(), fasta_file, "fasta")
def find_gene_name_for_locus(self, locus_tag):
# Iterate through all records in the GenBank file
for record_id, record in self.records.items():
for feature in record.features:
if feature.type == "gene":
# Check if this gene feature has a locus tag that matches the locus_tag argument
if (
"locus_tag" in feature.qualifiers
and feature.qualifiers["locus_tag"][0] == locus_tag
):
# If a gene name is available, return it, otherwise return the locus tag
return feature.qualifiers.get("gene", [locus_tag])[0]
# Return None or locus_tag if not found; depends on how you want to handle not found cases
return None