-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGTExData.py
executable file
·118 lines (101 loc) · 3.67 KB
/
GTExData.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!.venv/bin/python
import os, sys, gzip, csv
import pandas as pd
import numpy as np
import time
starts = []
def tic():
starts.append(time.time())
def toc(msg="Elapsed"):
elapsed = time.time()-starts[-1]
starts.pop(-1)
print(f"{msg}: {elapsed:.2f}",file=sys.stderr)
class GTexData:
tpmdata='GTEx_gene_tpm.gct.gz'
sampledata='GTEx_SampleAttributesDS.txt'
bindata='GTEx.bin'
def __init__(self,datadir='data',ensgid=False,force=False):
self.ensgid = ensgid
if not force and os.path.exists(os.path.join(datadir,self.bindata)):
self.readbin(datadir)
else:
self.read(datadir)
self.writebin(datadir)
def read(self,datadir='data'):
self.tpmfilename = os.path.join(datadir,self.tpmdata)
self.samplefilename = os.path.join(datadir,self.sampledata)
assert(os.path.exists(self.tpmfilename))
assert(os.path.exists(self.samplefilename))
tic()
self.readsamplemetadata()
toc("Read sample metatdata")
self.df = self.getdataframe()
def readsamplemetadata(self):
self.samplemd = dict()
for row in csv.DictReader(open(self.samplefilename), delimiter="\t"):
sid = row["SAMPID"]
tissue = row["SMTSD"]
self.samplemd[sid] = dict(tissue=tissue)
def get_sample_md(self,sid,mdkey):
return self.samplemd[sid].get(mdkey,None)
def get_sample_tissue(self,sid):
return self.get_sample_md(sid,'tissue')
def get_all_tissues(self):
return self.tissues
def getdataframe(self):
tic()
data = dict()
h = gzip.open(self.tpmfilename,'rt')
# Burn top two lines
next(h);next(h)
samples = None
for line in h:
if not samples:
samples = line.split()[2:]
tissues = [ self.get_sample_tissue(s) for s in samples ]
data['Sample'] = samples
data['Tissue'] = tissues
continue
ensgid,genename,rest = line.split(None,2)
if self.ensgid:
data[ensgid] = np.array(rest.split(), dtype=np.float32)
else:
data[genename] = np.array(rest.split(), dtype=np.float32)
h.close()
toc("Read data I/O")
tic()
df = pd.DataFrame(data=data)
toc("make data-frame")
tic()
df.set_index('Sample',inplace=True)
toc("set index")
return df
def writebin(self,datadir='data'):
self.df.reset_index(inplace=True)
self.df.to_feather(os.path.join(datadir,self.bindata))
self.df.set_index('Sample',inplace=True)
def readbin(self,datadir='data'):
self.df = pd.read_feather(os.path.join(datadir,self.bindata))
self.df.set_index('Sample',inplace=True)
self.tissues = set(self.df['Tissue'])
self.samplemd = dict(map(lambda t: (t[0],dict(tisssue=t[1])),zip(self.df.index,self.df['Tissue'])))
def genes(self):
return set(self.df.columns)
def restrict(self,genes=None,tissue=None,nottissue=None,includetissue=False):
if includetissue:
genes = ['Tissue'] + list(genes)
else:
genes = list(genes)
if tissue is not None:
tissuesamp = (self.df['Tissue'] == tissue)
elif nottissue is not None:
tissuesamp = (self.df['Tissue'] != nottissue)
else:
tissuesamp = (self.df['Tissue'] != "")
return self.df[tissuesamp][genes]
if __name__ == '__main__':
genes = set(sys.argv[1:])
tic()
gtd = GTexData()
toc("Constructor")
print(gtd.restrict(genes=genes,nottissue='Liver',includetissue=True))