-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeduplicate_anchors.py
95 lines (77 loc) · 2.75 KB
/
deduplicate_anchors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""
Deduplicate anchors by clustering them based on their similarity.
Tavor's code
"""
import time
import numpy as np
import scipy
import networkx as nx
import argparse
def parse_args():
parser = argparse.ArgumentParser(description="Deduplicate anchors")
parser.add_argument(
"--input", type=str, required=True, help="Input file containing anchors"
)
parser.add_argument(
"--output",
type=str,
required=True,
help="Output file containing deduplicated anchors",
)
parser.add_argument(
"--maxShiftDist",
type=int,
default=5,
help="Maximum number of bases that sequences can be shifted and considered in the same cluster",
)
return parser.parse_args()
def bpToInt(x):
if x == "A":
return 0
if x == "T":
return 1
if x == "C":
return 2
if x == "G":
return 3
return 4 # for nan
# i,j-th entry is 1 if i-th row of A equals j-th row of B
def compare_rows_hash(A, B):
n, k = A.shape
hash_A = np.array([hash(tuple(row)) for row in A])
hash_B = np.array([hash(tuple(row)) for row in B])
return scipy.sparse.csr_matrix(hash_A[:, None] == hash_B)
# main function
# inputs: anchLst, a list of sequences; maxShiftDist, the maximum number of bases that sequences can be shifted and considered in the same cluster
# output: a list of lists, where each list is a cluster of sequences
def clusterAnchors(anchLst, maxShiftDist=5):
start = time.time()
bpArr = np.array([[bpToInt(x) for x in s] for s in anchLst], dtype=np.uint8)
n, k = bpArr.shape
assert maxShiftDist <= k
simMat = scipy.sparse.csr_matrix((n, n), dtype=bool)
for shift in range(1, maxShiftDist + 1):
simMatUpdate = compare_rows_hash(bpArr[:, shift:], bpArr[:, :-shift])
simMat = simMat + simMatUpdate
print("Time until adjacency mat constructed", time.time() - start)
# from this similarity matrix, generate clusters
start = time.time()
G = nx.from_numpy_array(simMat)
assemblies = list(nx.connected_components(G))
print("Time until networkx done", time.time() - start)
# order clusters by size
assemblies.sort(key=len, reverse=True)
return [[anchLst[i] for i in list(cc)] for cc in assemblies] # output sequences
def main():
args = parse_args()
with open(args.input, "r") as f:
anchLst = [line.strip() for line in f]
clusters = clusterAnchors(anchLst, args.maxShiftDist)
# return the first anchor in the cluster as the representative anchor
with open(args.output, "w") as f:
for i in range(len(clusters)):
for j in range(len(clusters[i])):
f.write(str(i) + "\t" + clusters[i][j] + "\n")
return None
if __name__ == "__main__":
main()