-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathisONclust
executable file
·264 lines (217 loc) · 14.2 KB
/
isONclust
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
#! /usr/bin/env python
from __future__ import print_function
import os,sys
import argparse
import tempfile
import errno
from time import time
import shutil
from modules import get_sorted_fastq_for_cluster
from modules import cluster
from modules import p_minimizers_shared
from modules import help_functions
from modules import parallelize
from modules import cluster
from modules import consensus
def single_clustering(read_array, p_emp_probs, args):
start_cluster = time()
clusters = {} # initialize every read as belonging to its own cluster
representatives = {} # initialize every read as its own representative
for i, b_i, acc, seq, qual, score in read_array:
clusters[i] = [acc]
representatives[i] = (i, b_i, acc, seq, qual, score)
minimizer_database, new_batch_index = {}, 1 # These data structures are used in multiprocessing mode but. We use one core so only process everything in one "batch" and dont need to pass the minimizer database to later iterations.
result_dict = cluster.reads_to_clusters(clusters, representatives, read_array, p_emp_probs, minimizer_database, new_batch_index, args)
# Unpack result. The result dictionary structure is convenient for multiprocessing return but clumsy in single core mode.
clusters, representatives, _, _ = list(result_dict.values())[0]
print("Time elapesd clustering:", time() - start_cluster)
return clusters, representatives
def main(args):
"""
Code in main function is structures into 4 steps
1. Sort all reads according to expected errorfree kmers
2. Import precalculated probabilities of minimizer matching given the error rates of reads, kmer length, and window length.
This is used for calculating if reads matches representative.
3. Cluster the reads
4. Write output
"""
##### Sort all reads according to expected errorfree kmers #####
args.outfile = os.path.join(args.outfolder, "sorted.fastq")
print("started sorting seqs")
start = time()
sorted_reads_fastq_file = get_sorted_fastq_for_cluster.main(args)
read_array = [ (i, 0, acc, seq, qual, float(acc.split("_")[-1])) for i, (acc, (seq, qual)) in enumerate(help_functions.readfq(open(sorted_reads_fastq_file, 'r')))]
print("elapsed time sorting:", time() - start)
#################################################################
##### Import precalculated probabilities of minimizer matching given the error rates of reads, kmer length, and window length #####
print("Started imported empirical error probabilities of minimizers shared:")
start = time()
p_min_shared = p_minimizers_shared.read_empirical_p()
p_emp_probs = {}
for k, w, p, e1, e2 in p_min_shared:
if int(k) == args.k and abs(int(w) - args.w) <= 2:
p_emp_probs[(float(e1),float(e2))] = float(p)
p_emp_probs[(float(e2),float(e1))] = float(p)
print(p_emp_probs)
print(len(p_emp_probs))
print("elapsed time imported empirical error probabilities of minimizers shared:", time() - start)
##################################################################################################################################
##### Cluster reads, bulk of code base is here #####
print("started clustring")
start = time()
if args.nr_cores > 1:
clusters, representatives = parallelize.parallel_clustering(read_array, p_emp_probs, args)
else:
print("Using 1 core.")
clusters, representatives = single_clustering(read_array, p_emp_probs, args)
# clusters, representatives = cluster.cluster_seqs(read_array, p_emp_probs, args)
print("Time elapsed clustering:", time() - start)
####################################################
##### Write output in sorted quality order! #####
outfile = open(os.path.join(args.outfolder, "final_clusters.tsv"), "w")
origins_outfile = open(os.path.join(args.outfolder, "final_cluster_origins.tsv"), "w")
nontrivial_cluster_index = 0
output_cl_id = 0
for c_id, all_read_acc in sorted(clusters.items(), key = lambda x: (len(x[1]),representatives[x[0]][5]), reverse=True):
# for c_id, all_read_acc in sorted(clusters.items(), key = lambda x: (len(x[1]),x[0]), reverse=True):
read_cl_id, b_i, acc, c_seq, c_qual, score, error_rate = representatives[c_id]
origins_outfile.write("{0}\t{1}\t{2}\t{3}\t{4}\t{5}\n".format(output_cl_id, "_".join([item for item in acc.split("_")[:-1]]), c_seq, c_qual, score, error_rate))
for r_acc in sorted(all_read_acc, key = lambda x: float(x.split("_")[-1]) , reverse=True):
outfile.write("{0}\t{1}\n".format(output_cl_id, "_".join([item for item in r_acc.split("_")[:-1]]) ))
if len(all_read_acc) > 1:
nontrivial_cluster_index += 1
output_cl_id +=1
print("Nr clusters larger than 1:", nontrivial_cluster_index) #, "Non-clustered reads:", len(archived_reads))
print("Nr clusters (all):", len(clusters)) #, "Non-clustered reads:", len(archived_reads))
outfile.close()
origins_outfile.close()
############################
if args.consensus:
print()
print("STARTING TO CREATE CLUSTER CONSENSUS")
print()
work_dir = tempfile.mkdtemp()
print("Temporary workdirektory for polishing:", work_dir)
centers = []
reads = { acc : (seq, qual) for acc, (seq, qual) in help_functions.readfq(open(sorted_reads_fastq_file, 'r'))}
nr_reads = len(reads)
abundance_cutoff = int( args.abundance_ratio * nr_reads)
for c_id, all_read_acc in sorted(clusters.items(), key = lambda x: (len(x[1]),representatives[x[0]][5]), reverse=True):
reads_path = open(os.path.join(work_dir, "reads_tmp.fq"), "w")
nr_reads_in_cluster = len(all_read_acc)
if nr_reads_in_cluster >= abundance_cutoff:
for acc in all_read_acc:
seq, qual = reads[acc]
reads_path.write("@{0}\n{1}\n{2}\n{3}\n".format(acc, seq, "+", qual))
# reads_path.write(">{0}\n{1}\n".format(str(q_id)+str(pos1)+str(pos2), seq))
reads_path.close()
# spoa_ref = create_augmented_reference.run_spoa(reads_path.name, os.path.join(work_dir,"spoa_tmp.fa"), "spoa")
print("creating center of {0} sequences.".format(nr_reads_in_cluster))
center = consensus.run_spoa(reads_path.name, os.path.join(work_dir,"spoa_tmp.fa"), "spoa")
centers.append( (nr_reads_in_cluster, c_id, center))
print("{0} centers formed".format(len(centers)))
centers_filtered = consensus.detect_reverse_complements(centers, args.rc_identity_threshold)
if args.medaka:
print('Currently not implemented')
else:
polished_outfile = os.path.join(args.outfolder, "consensus_references.fasta")
f = open(polished_outfile, "w")
for (nr_reads_in_cluster, c_id, center) in centers_filtered:
f.write(">{0}\n{1}\n".format("consensus_cl_id:{0}_total_supporting_reads:{1}".format(c_id,nr_reads_in_cluster), center))
print("removing temporary workdir")
print("Saving references in:", f.name)
f.close()
shutil.rmtree(work_dir)
def write_fastq(args):
from collections import defaultdict
clusters = defaultdict(list)
with open(args.clusters) as f:
for line in f:
items = line.strip().split()
cl_id, acc = items[0], items[1]
clusters[cl_id].append(acc)
help_functions.mkdir_p(args.outfolder)
reads = { acc : (seq, qual) for acc, (seq, qual) in help_functions.readfq(open(args.fastq, 'r'))}
for cl_id in clusters:
r = clusters[cl_id]
if len(r) >= args.N:
curr_file = open(os.path.join(args.outfolder, str(cl_id) + ".fastq" ), "w")
for acc in r:
seq, qual = reads[acc]
curr_file.write("@{0}\n{1}\n{2}\n{3}\n".format(acc, seq, "+", qual))
curr_file.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="De novo clustering of long-read transcriptome reads", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--version', action='version', version='%(prog)s 0.0.6.1')
parser.add_argument('--fastq', type=str, default=False, help='Path to consensus fastq file(s)')
parser.add_argument('--flnc', type=str, default=False, help='The flnc reads generated by the isoseq3 algorithm (BAM file)')
parser.add_argument('--ccs', type=str, default=False, help='Path to consensus BAM file(s)')
# parser.add_argument('--mapping', action="store_true", help='Only infer clusters by mapping, no alignment is performed.')
parser.add_argument('--t', dest="nr_cores", type=int, default=8, help='Number of cores allocated for clustering')
parser.add_argument('--d', dest="print_output", type=int, default=10000, help='For debugging, prints status of clustering and minimizer database every p reads processed.')
parser.add_argument('--q', dest="quality_threshold", type=float, default=7.0, help='Filters reads with average phred quality value under this number (default = 7.0).')
parser.add_argument('--ont', action="store_true", help='Clustering of ONT transcript reads.')
parser.add_argument('--isoseq', action="store_true", help='Clustering of PacBio Iso-Seq reads.')
parser.add_argument('--use_old_sorted_file', action="store_true", help='Using already existing sorted file if present in specified output directory.')
parser.add_argument('--consensus', action="store_true", help='After clustering, (1) run spoa on all clusters, (2) detect reverse complements, (3) run medaka.')
parser.add_argument('--abundance_ratio', type=float, default=0.1, help='Threshold for --consensus algorithm. Consider only clusters larger than a fraction of number of total reads (default 0.1)')
parser.add_argument('--rc_identity_threshold', type=float, default=0.9, help='Threshold for --consensus algorithm. Define a reverse complement if identity is over this threshold (default 0.9)')
parser.add_argument('--medaka', action="store_true", help='Run final medaka polishing algorithm.')
parser.add_argument('--k', type=int, default=15, help='Kmer size')
parser.add_argument('--w', type=int, default=50, help='Window size')
parser.add_argument('--min_shared', type=int, default=5, help='Minmum number of minimizers shared between read and cluster')
parser.add_argument('--mapped_threshold', type=float, default=0.7, help='Minmum mapped fraction of read to be included in cluster. The density of minimizers to classify a region as mapped depends on quality of the read.')
parser.add_argument('--aligned_threshold', type=float, default=0.4, help='Minmum aligned fraction of read to be included in cluster. Aligned identity depends on the quality of the read.')
parser.add_argument('--batch_type', type=str, default='total_nt', help='In parrallel mode, how to split the reads into chunks "total_nt", "nr_reads", or "weighted" (default: total_nt) ')
parser.add_argument('--min_fraction', type=float, default=0.8, help='Minmum fraction of minimizers shared compared to best hit, in order to continue mapping.')
parser.add_argument('--min_prob_no_hits', type=float, default=0.1, help='Minimum probability for i consecutive minimizers to be different between read and representative and still considered as mapped region, under assumption that they come from the same transcript (depends on read quality).')
parser.add_argument('--outfolder', type=str, default=None, help='A fasta file with transcripts that are shared between samples and have perfect illumina support.')
# parser.add_argument('--pickled_subreads', type=str, help='Path to an already parsed subreads file in pickle format')
parser.set_defaults(which='main')
subparsers = parser.add_subparsers(help='sub-command help')
write_fastq_parser = subparsers.add_parser('write_fastq', help='a help')
write_fastq_parser.add_argument('--clusters', type=str, help='the file "final_clusters.csv created by isONclust."')
write_fastq_parser.add_argument('--fastq', type=str, help='Input fastq file')
write_fastq_parser.add_argument('--outfolder', type=str, help='Output folder')
write_fastq_parser.add_argument('--N', type=int, default = 0, help='Write out clusters with more or equal than N reads')
# parser.add_argument('--write_fastq_clusters', default = None, help=' --write_fastq_clusters <N>. Write out clusters with more or equal than N >= 1.')
write_fastq_parser.set_defaults(which='write_fastq')
args = parser.parse_args()
if args.which == 'write_fastq':
write_fastq(args)
print("Wrote clusters to separate fastq files.")
sys.exit(0)
if (args.fastq and (args.flnc or args.ccs)):
print("Either (1) only a fastq file, or (2) a ccs and a flnc file should be specified. ")
sys.exit()
if (args.flnc != False and args.ccs == False ) or (args.flnc == False and args.ccs != False ):
print("isONclust needs both the ccs.bam file produced by ccs and the flnc file produced by isoseq3 cluster. ")
sys.exit()
if args.ont and args.isoseq :
print("Arguments mutually exclusive, specify either --isoseq or --ont. ")
sys.exit()
elif args.isoseq:
args.k = 15
args.w = 50
elif args.ont:
args.k = 13
args.w = 20
if len(sys.argv)==1:
parser.print_help()
sys.exit()
if not args.fastq and not args.flnc and not args.ccs:
parser.print_help()
sys.exit()
if args.outfolder and not os.path.exists(args.outfolder):
os.makedirs(args.outfolder)
# edlib_module = 'edlib'
parasail_module = 'parasail'
# if edlib_module not in sys.modules:
# print('You have not imported the {0} module. Only performing clustering with mapping, i.e., no alignment.'.format(edlib_module))
if parasail_module not in sys.modules:
print('You have not imported the {0} module. Only performing clustering with mapping, i.e., no alignment!'.format(parasail_module))
sys.exit(1)
if 100 < args.w or args.w < args.k:
print('Please specify a window of size larger or equal to k, and smaller than 100.')
sys.exit(1)
main(args)