-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_genfile.py
executable file
·128 lines (107 loc) · 3.89 KB
/
read_genfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
import sys
import random
import logging
import argparse
__description__ = "TBA."
__epilog__ = """
TBA.
"""
__version__ = "2019.7.0"
chrmap = {
"1": "NC_015438.1",
"2": "NC_015439.1",
"3": "NC_015440.1",
"4": "NC_015441.1",
"5": "NC_015442.1",
"6": "NC_015443.1",
"7": "NC_015444.1",
"8": "NC_015445.1",
"9": "NC_015446.1",
"10": "NC_015447.1",
"11": "NC_015448.1",
"12": "NC_015449.1"
}
def read_gen(fname):
with open(fname, "r") as genfile:
snps = next(genfile)
snps = [x.split("-")[0] for x in snps.strip().split(",")[1:]]
chrom = next(genfile)
chrom = chrom.strip().split(",")[1:]
cm = next(genfile)
cm = list(map(float, cm.strip().split(",")[1:]))
next(genfile)
rils = dict()
for ril in genfile:
ril = ril.strip().split(",")
rils[ril[0]] = ril[1:]
rm_idx = [i for i in range(len(snps)) if "seq" in snps[i]]
for i in rm_idx[::-1]:
del chrom[i]
del snps[i]
del cm[i]
for ril in rils:
del rils[ril][i]
return chrom, snps, cm, rils
def get_random_interval(chrom, snps, cm, rils, interval_cm):
rand_snp = random.randrange(len(snps))
rand_chr = chrom[rand_snp]
cm1 = cm[rand_snp]
while cm[len(chrom) - chrom[::-1].index(rand_chr) - 1] - cm1 < interval_cm:
rand_snp = random.randrange(len(snps))
rand_chr = chrom[rand_snp]
cm1 = cm[rand_snp]
offset = 0
cm2 = cm[rand_snp + offset]
while abs(cm1 - cm2) < interval_cm:
offset += 1
cm2 = cm[rand_snp + offset]
return rand_snp, rand_snp + offset
def parse_arguments():
parser = argparse.ArgumentParser(description=__description__, epilog=__epilog__)
# Required arguments
parser.add_argument("marker_file", metavar="GENFILE", help="File containing the genetic marker map")
# Optional arguments
parser.add_argument("-i", "--iterations", metavar="N", dest="iterations", help="Number of peaks to generate",
type=int, default=1)
parser.add_argument("-w", "--peak-width", metavar="CM", dest="peak_width", help="Minimum width of the peaks in cM",
type=float, default=10.0)
# Standard arguments
parser.add_argument("-v", "--verbose", help="Increase verbosity level", action="count")
parser.add_argument("-q", "--silent", help="Suppresses output messages, overriding the --verbose argument",
action="store_true")
parser.add_argument("-l", "--log", help="Set the logging output location",
type=argparse.FileType('w'), default=sys.stderr)
parser.add_argument("-V", "--version", action="version", version=__version__)
return parser.parse_args()
def set_logging(args):
log_level = logging.WARNING
if args.silent:
log_level = logging.ERROR
elif not args.verbose:
pass
elif args.verbose >= 2:
log_level = logging.DEBUG
elif args.verbose == 1:
log_level = logging.INFO
logging.basicConfig(format="[%(asctime)s] %(message)s", level=log_level, stream=args.log)
logging.debug("Setting verbosity level to %s" % logging.getLevelName(log_level))
if __name__ == "__main__":
args = parse_arguments()
set_logging(args)
chrom, snps, cm, rils = read_gen(args.marker_file)
print("lod_id", "chr", "start", "end", sep="\t")
prev_snps = list()
i = 1
while i <= args.iterations:
snp1, snp2 = get_random_interval(chrom, snps, cm, rils, args.peak_width)
overlap = False
for s1, s2 in prev_snps:
if s1 < snp1 < s2 or s1 < snp2 < s2:
overlap = True
break
if overlap:
continue
print(i, chrmap[chrom[snp1]], snps[snp1], snps[snp2], sep="\t")
prev_snps.append((snp1, snp2))
i += 1