-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprep_qtlsearch.py
executable file
·164 lines (134 loc) · 5.52 KB
/
prep_qtlsearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/env python3
""""
Copyright (c) 2019 Thom Griffioen
MIT License
"""
import argparse
import csv
import gzip
import logging
import lzma
import pickle
import sys
__description__ = "TBA."
__epilog__ = """
TBA.
"""
__version__ = "2019.6.0"
def get_lod_regions(peaks_file):
logging.info("Reading LOD peaks from %s ...", peaks_file)
regions = list()
with open(peaks_file, "r", newline="") as csv_file:
handle = csv.reader(csv_file, delimiter='\t')
next(handle)
for record in handle:
regions.append(record)
return regions
def refseq2chr(mapping_file):
mapping = dict()
with open(mapping_file, "r", newline="") as csv_file:
handle = csv.reader(csv_file, delimiter='\t')
next(handle)
for record in handle:
mapping[record[0]] = record[1]
return mapping
def get_geneid2go(idmapping_file):
logging.info("Decompressing and loading ID mapping database from %s ...", idmapping_file)
with lzma.open(idmapping_file, "rb") as f:
return pickle.load(f)
def create_qtl_file(qtl_file, lod_regions, trait=None):
logging.info("Writing QTL file to %s ...", qtl_file)
with open(qtl_file, "wt") as f:
for region in lod_regions:
print(region[0], trait, region[1], int(region[2]) / 1e6, int(region[3]) / 1e6, sep="\t", file=f)
def extract_genes_from_regions(gff_file, region):
logging.info("Extracting genes from %s ...", gff_file)
mapping = refseq2chr("refseq2chr.csv")
matching_records = list()
with gzip.open(gff_file, "rt") as gff:
for line in gff:
if not line:
continue
if line[0] == "#":
continue
line = line.strip().split("\t")
record = dict(
chr=line[0],
start=int(line[3]),
end=int(line[4]),
type=line[2],
annotation=line[-1]
)
if record["type"] != "gene":
continue
find_chr = mapping.get(region[1], "-")
if record["chr"] != find_chr or record["start"] < int(region[2]) or record["end"] > int(region[3]):
continue
record["annotation"] = {k: v for k, v in (x.split("=") for x in record["annotation"].split(";"))}
record["xref"] = {k: v for k, v in (x.split(":") for x in record["annotation"]["Dbxref"].split(","))}
matching_records.append(record)
return matching_records
def get_go_terms(gene_ids, mapping_file):
lookup_table = get_geneid2go(mapping_file)
go_terms = list()
for gi in gene_ids:
go_terms += lookup_table.get(gi, list())
return set(go_terms)
def create_traitmap_file(traits_file, go_terms, trait=None):
logging.info("Writing trait map file to %s ...", traits_file)
with open(traits_file, "wt") as f:
print("trait", "go", sep="\t", file=f)
for term in go_terms:
print(trait, term, sep="\t", file=f)
def parse_arguments():
parser = argparse.ArgumentParser(description=__description__, epilog=__epilog__)
# Required arguments
parser.add_argument("peaks_file", metavar="LOD_PEAKS", help="CSV file containing the LOD peak regions.")
parser.add_argument("qtl_out", metavar="QTL_OUT", help="Output QTL file for QTLSearch.")
parser.add_argument("trait_out", metavar="TRAIT_OUT", help="Output trait mapping file for QTLSearch.")
# Optional arguments
parser.add_argument("-g", "--gff", metavar="FILE", dest="gff_file",
help="GFF file (gzipped) of the genome containing the genes.", default="genome.gff.gz")
parser.add_argument("-m", "--map", metavar="FILE", dest="mapping_file",
help="ID mapping file generated by make_uniprot_idmapping_db.py.", default="mapping.pickle.xz")
# Standard arguments
parser.add_argument("-v", "--verbose", help="Increase verbosity level", action="count")
parser.add_argument("-q", "--silent", help="Suppresses output messages, overriding the --verbose argument",
action="store_true")
parser.add_argument("-l", "--log", help="Set the logging output location",
type=argparse.FileType('w'), default=sys.stderr)
parser.add_argument("-V", "--version", action="version", version=__version__)
return parser.parse_args()
def set_logging(args):
log_level = logging.WARNING
if args.silent:
log_level = logging.ERROR
elif not args.verbose:
pass
elif args.verbose >= 2:
log_level = logging.DEBUG
elif args.verbose == 1:
log_level = logging.INFO
logging.basicConfig(format="[%(asctime)s] %(message)s", level=log_level, stream=args.log)
logging.debug("Setting verbosity level to %s" % logging.getLevelName(log_level))
if __name__ == "__main__":
args = parse_arguments()
set_logging(args)
exitcode = 0
try:
regions = get_lod_regions(args.peaks_file)
create_qtl_file(args.qtl_out, regions, "PDW")
record_list = list()
for region in regions:
record_list += extract_genes_from_regions(args.gff_file, region)
gene_list = set(x["xref"].get("GeneID") for x in record_list)
go_terms = get_go_terms(gene_list, args.mapping_file)
create_traitmap_file(args.trait_out, go_terms, "PDW")
# except Exception as ex:
# exitcode = 1
# logging.error(ex)
# logging.debug(format_exc())
finally:
logging.debug("Shutting down logging system ...")
logging.shutdown()
sys.exit(exitcode)