-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprep_goseq.py
executable file
·112 lines (94 loc) · 3.64 KB
/
prep_goseq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python3
""""
Copyright (c) 2019 Thom Griffioen
MIT License
"""
import argparse
import csv
import gzip
import logging
import lzma
import pickle
import sys
__description__ = "TBA."
__epilog__ = """
TBA.
"""
__version__ = "2019.9.4"
def get_geneid2go(idmapping_file):
logging.info("Decompressing and loading ID mapping database from %s ...", idmapping_file)
with lzma.open(idmapping_file, "rb") as f:
return pickle.load(f)
def load_gff_genes(gff_file):
logging.info("Parsing genes from %s ...", gff_file)
gene_records = dict()
with gzip.open(gff_file, "rt") as gff:
for line in gff:
if not line:
continue
if line[0] == "#":
continue
line = line.strip().split("\t")
record = {
"chr": line[0],
"start": int(line[3]),
"end": int(line[4]),
"type": line[2],
"annotation": line[-1]
}
if record["type"] != "gene":
# skip all non-genes
continue
record["annotation"] = {k: v for k, v in (x.split("=") for x in record["annotation"].split(";"))}
record["xref"] = {k: v for k, v in (x.split(":") for x in record["annotation"]["Dbxref"].split(","))}
gene_id = record["xref"].get("GeneID")
if gene_id is not None:
gene_records[gene_id] = record
return gene_records
def parse_arguments():
parser = argparse.ArgumentParser(description=__description__, epilog=__epilog__)
# Required arguments
parser.add_argument("mapping_file", metavar="MAP",
help="ID mapping file generated by make_uniprot_idmapping_db.py", default="mapping.pickle.xz")
parser.add_argument("gff_file", metavar="GFF",
help="GFF file (gzipped) of the genome containing the genes", default="genome.gff.gz")
# Standard arguments
parser.add_argument("-v", "--verbose", help="Increase verbosity level", action="count")
parser.add_argument("-q", "--silent", help="Suppresses output messages, overriding the --verbose argument",
action="store_true")
parser.add_argument("-l", "--log", help="Set the logging output location",
type=argparse.FileType('w'), default=sys.stderr)
parser.add_argument("-V", "--version", action="version", version=__version__)
return parser.parse_args()
def set_logging(args):
log_level = logging.WARNING
if args.silent:
log_level = logging.ERROR
elif not args.verbose:
pass
elif args.verbose >= 2:
log_level = logging.DEBUG
elif args.verbose == 1:
log_level = logging.INFO
logging.basicConfig(format="[%(asctime)s] %(message)s", level=log_level, stream=args.log)
logging.debug("Setting verbosity level to %s" % logging.getLevelName(log_level))
if __name__ == "__main__":
args = parse_arguments()
set_logging(args)
exitcode = 0
try:
lookup_table = get_geneid2go(args.mapping_file)
gff_genes = load_gff_genes(args.gff_file)
print("geneid", "chromosome", "start", "end", "length", "go_term", sep="\t")
for geneid, record in gff_genes.items():
go_terms = lookup_table.get(geneid, list())
for go_term in go_terms:
print(geneid, record["chr"], record["start"], record["end"], record["end"]-record["start"], go_term, sep="\t")
# except Exception as ex:
# exitcode = 1
# logging.error(ex)
# logging.debug(format_exc())
finally:
logging.debug("Shutting down logging system ...")
logging.shutdown()
sys.exit(exitcode)