-
Notifications
You must be signed in to change notification settings - Fork 8
/
Run_TE_ID_reseq_streaming.py
487 lines (358 loc) · 19.1 KB
/
Run_TE_ID_reseq_streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# elizabeth henaff
# Feb 2012
# elizabeth.henaff@cragenomica.es
import pysam
from BamReader import *
from ClusterList import *
from AlignedReadPair import *
import datetime
import subprocess
from pysam import csamtools
import os
import shlex
#from memory_profiler import profile
import gc
import cPickle as pickle
import psutil
import multiprocessing
import time as timetime
def load_pickle(filename):
with open(filename, "rb") as f:
while True:
try:
yield pickle.load(f)
except EOFError:
break
#@profdile
def run_jitterbug_streaming(psorted_bamfile_name, verbose, te_annot, \
te_seqs, library_name, num_sdev, output_prefix, TE_name_tag, parallel, num_CPUs, bin_size, min_mapq, generate_test_bam, print_extra_output, conf_lib_stats, mem, min_cluster_size,step1only=False,step2only=False):
proc = psutil.Process(os.getpid())
print proc.get_memory_info().rss
#NOTE: comment this later !!!!!!!!!!!!!!!!
#sorted_bam_reader = BamReader(output_prefix + ".proper_pair.sorted.bam", output_prefix)
print "processing " + psorted_bamfile_name
if not output_prefix:
output_prefix = psorted_bamfile_name
bamFileName, bamFileExtension = os.path.splitext(psorted_bamfile_name)
filtered_bam_file_name = output_prefix + ".input_bam.filtered.sam"
print "filtering reads in %s and writing to %s" % (psorted_bamfile_name, filtered_bam_file_name)
# the -F 3854 flag excludes all the reads with the folowwing bits set:
# read mapped in proper pair
# read unmapped
# mate unmapped
# not primary alignment
# read fails platform/vendor quality checks
# read is PCR or optical duplicate
# supplementary alignment
# IMPORTANTLY: chimeric and non-primary reads are excluded
# the -F 1550 flag excludes all the reads with the folowwing bits set:
# read mapped in proper pair
# read unmapped
# mate unmapped
# read fails platform/vendor quality checks
# read is PCR or optical duplicate
# IMPORTANTLY: non-primary and supplementary reads are kept, ie, split reads generated by bwa-mem
start_time = datetime.datetime.now()
print "starting at %s" % (str(start_time))
# construct list of args
if not mem:
# args = ["samtools", "view", "-h" , "-F", "3854", "-o" , filtered_bam_file_name , psorted_bamfile_name ]
args1 = ["samtools", "view", "-h" , "-F", "3854", psorted_bamfile_name]
#print args1
args2 = ["awk" , r'{ if ($9 <= (-500) || $9>=500 || $9==0 ) { print $0 } }']
#print args2
else:
args1 = ["samtools", "view", "-h" , "-F", "1550", psorted_bamfile_name]
#print args1
args2 = ["awk" , r'{ if ($9 <= (-1000) || $9>=1000 || $9==0 ) { print $0 } }']
#print args2
# open subprocess
try:
if not(step2only):
outfile = open(filtered_bam_file_name, "w")
p1 = subprocess.Popen(args1, stdout=subprocess.PIPE)
p2 = subprocess.Popen(args2, stdin=p1.stdout, stdout=outfile)
p1.stdout.close()
output = p2.communicate()[0]
else:
print "Skipping step 1 of bam file filtering."
# p = subprocess.Popen(args)
# p.wait()
except Exception, e:
print "error using --pre_filter: is samtools not installed or in your PATH? do you have the awk utility?"
print e
sys.exit(1)
if step1only:
print "Step 1 finished. Halting execution"
sys.exit(0)
if generate_test_bam:
print "generating test bam"
psorted_bam_reader.output_one_chr_reads()
return None
if conf_lib_stats:
# Get the mean and sdev of insert size from supplied config file
stats = {}
for line in open(conf_lib_stats):
line = line.strip()
(tag, val) = line.split("\t")
stats[tag] = (float(val))
isize_mean = stats["fragment_length"]
isize_sdev = stats["fragment_length_SD"]
rlen_mean = stats["read_length"]
rlen_sdev = stats["read_length_SD"]
print "mean fragment length taken from config file: %.2f" % (isize_mean)
print "standard deviation of fragment_length: %.2f" % (isize_sdev)
print "mean read length: %.2f" % (rlen_mean)
print "standard deviation of read length: %.2f" % (rlen_sdev)
else:
# disable this calculation for the time being for the streaming analysis -- require a conf file
# print "error! must supply configuration file of library stats"
# sys.exit(1)
# Get the mean and sdev of insert size from the ORIGINAL bam file
psorted_bam_reader = BamReader(psorted_bamfile_name, output_prefix)
print "calculating mean insert size..."
iterations = 1000000
(isize_mean, isize_sdev, rlen_mean, rlen_sdev) = psorted_bam_reader.calculate_mean_sdev_isize(iterations)
print "mean fragment length over %d reads: %.2f" % (iterations, isize_mean)
print "standard deviation of fragment_length: %.2f" % (isize_sdev)
print "mean read length: %.2f" % (rlen_mean)
print "standard deviation of read length: %.2f" % (rlen_sdev)
stats_file = open(output_prefix + ".read_stats.txt", "w")
stats_file.write("fragment_length\t%.2f\n" % (isize_mean))
stats_file.write("fragment_length_SD\t%.2f\n" % (isize_sdev))
stats_file.write("read_length\t%.2f\n" % (rlen_mean))
stats_file.write("read_length_SD\t%.2f" % (rlen_sdev))
stats_file.close()
#if fragment sdev is much larger than expected, there might be a problem with the reads of the mapping. Set as default 0.1*fragment length as a reasonable guess.
# This is necessary because aberrant values for sdev will mess up the interval overlap calculation and the filtering
if isize_sdev > 0.2*isize_mean:
isize_sdev = 0.1*isize_mean
print "WARNING: fragment length standard deviation seems way too large to be realistic.\\n\
There is maybe something weird with the flags in your bam mapping, or a very large number of large SV \\n\
that are messing up the count.\\n\
Setting the stdev to 0.1*fragment_length = %.2f for downstream calculations" % isize_sdev
time = datetime.datetime.now()
print "elapsed time: " + str(time - start_time)
################# Find valid discordant reads in given sorted bam file ################
# This will print bam file(s) with the set(s) of valid discordant reads meaning
# that the reads in apair are mapped to distances greater than expected or to
# two different chromosomes, and with at least one read that is mapped uniquely
# if strict_repetitive is TRUE, will print out two bam files:
# <bam_file_name>.valid_discordant_pairs_strict_rep.bam which is all valid discordant
# read pairs with exactly one uniquely mapping and one repetitively mapping read pair
#
# <bam_file_name>.valid_discordant_pairs.bam which are all discordant valid discordant
# read pairs with two uniquely mapping reads
# if strict_repetitive is TRUE, will output both sets to a file named
# <bam_file_name>.valid_discordant_pairs.bam
#if have already run the prgm and calculated the discordant reads, dont do it again and look for a file called
#<bam_file_name>.valid_discordant_pairs or <bam_file_name>.valid_discordant_pairs_strict_rep depending on the value of -s
#if not already_calc_discordant_reads:
# Make BamReader object with bam file of FILTERED reads
psorted_bam_reader = BamReader(filtered_bam_file_name, output_prefix)
valid_discordant_reads_file_name = output_prefix + ".valid_discordant_pairs.bam"
database_file=output_prefix+"dbfile.sqlite"
print "selecting discordant reads..."
#this writes the bam file of discordant reads to disc to be used later, and return the counts of different types of reads
(bam_stats, ref_lengths, ref_names) = psorted_bam_reader.select_discordant_reads_psorted( verbose, isize_mean, valid_discordant_reads_file_name)
#print ref_names, ref_lengths
coverage = (bam_stats["total_reads"] * rlen_mean )/ sum(ref_lengths)
filter_conf_file = open(output_prefix + ".filter_config.txt", "w")
filter_conf_file.write("cluster_size\t2\t%d\n" % (5*coverage))
filter_conf_file.write("span\t2\t%d\n" % isize_mean)
filter_conf_file.write("int_size\t%d\t%d\n" % (rlen_mean, 2*(isize_mean + 2*isize_sdev - (rlen_mean - rlen_sdev))) )
filter_conf_file.write("softclipped\t2\t%d\n" % (5*coverage))
filter_conf_file.write("pick_consistent\t0\t-1")
filter_conf_file.close()
bam_stats_file = open(output_prefix + ".bam_stats.txt", "w")
for key, value in bam_stats.items():
bam_stats_file.write("%s\t%d\n" % (key, value))
bam_stats_file.close()
time = datetime.datetime.now()
print "elapsed time: " + str(time - start_time)
################### Select valid discordant reads that match a TE #####################
# Of the valid discordant read pairs, select those that have exactly one side that
# overlaps a TE. This will
# return a list of AlignedReadPair objects
# the interval size in calculated as the INSIDE interval between the two reads ?? is this true? , plus numsdev * the sd of the insert size
print "selecting discordant read pairs where exactly one maps to a TE..."
interval_size = isize_mean + num_sdev * isize_sdev
if te_annot:
discordant_bam_reader = BamReader(valid_discordant_reads_file_name, output_prefix)
### This one has been edited to reduce RAM usage to a fixed cuota and store all results outside in sqlite file
read_pair_one_overlap_TE_list = discordant_bam_reader.select_read_pair_one_overlap_TE_annot(te_annot, interval_size, min_mapq,database_file,bin_size)
read_pair_one_overlap_TE_list=[1,1]
if not print_extra_output:
pass
#os.remove(valid_discordant_reads_file_name)
else:
#here you would map mate reads to TE sequences and whatnot.
pass
time = datetime.datetime.now()
print "elapsed time: " + str(time - start_time)
######################## wait till the proper pair bam file is sorted, and index it ###########################
##################### Cluster discordant read pairs that match TE #####################
# Cluster the list of AlignedReadPair objects that have one read overlapping a TE
# according to the uniquely mapping non-TE genomic location
# then pair a fwd and rev cluster if they are overlapping
# for the clusters that can be paired, calculate the softclipped support and the core reads, which will indicate heterozygosity of predicted TE insertion
# bed file to store the insertion regions, to use to query the bam file later to calculate the zygosity
bed_file_name = output_prefix + ".insertion_regions.bed"
bed_file_handle = open(bed_file_name, "w")
print "generating clusters..."
if len(read_pair_one_overlap_TE_list) == 0:
print "there might be an error, no discordant reads mapped to a TE location. please check the gff file... are the chromosome names the same as in the reference?"
sys.exit(2)
cluster_list = ClusterList(read_pair_one_overlap_TE_list)
#if not parallel:
# (cluster_pairs, unpaired_fwd_clusters, unpaired_rev_clusters) = cluster_list.generate_clusters(verbose, psorted_bamfile_name, bed_file_handle, True, min_cluster_size)
# all_clusters = [(cluster_pairs, unpaired_fwd_clusters, unpaired_rev_clusters)]
##parallel version:
#else:
# # empty string is psorted bed file name, and True is streaming
# all_clusters = cluster_list.generate_clusters_parallel(verbose, num_CPUs, bin_size, "", bed_file_handle, True, min_cluster_size,output_prefix)
# # bed_file_handle.close()
##### Alternative with low RAM because it prints out all the results at once and use the database file sqlite from before.
all_clusters = cluster_list.generate_clusters_db(database_file,bin_size,output_prefix,"", verbose, bed_file_handle, True, min_cluster_size)
### retrieve reads in the intervals where insertions were predicted and use them to calculate allelic frequency (zygosity) of the predictions
ins_regions_bam_name = output_prefix + ".insertion_regions.reads.bam"
#################################
# ATTEMPT TO REDUCE MEMORY
#################################
print proc.get_memory_info().rss
del read_pair_one_overlap_TE_list
del cluster_list
gc.collect()
timetime.sleep(30)
print proc.get_memory_info().rss
# with open(output_prefix+'all_clusters.pkl', 'wb') as output:
# ### saving the biggest object in a text file to avoid os.fork later
# pickle.dump(all_clusters, output, pickle.HIGHEST_PROTOCOL)
# #del all_clusters
gc.collect()
print proc.get_memory_info().rss
####print out all the existing variables ###
#import pprint
##pprint.pprint(locals())
#with open("locals_out.txt", "w") as fout:
# fout.write(pprint.pformat(locals()))
###############
# for mem debug
# return 0
###############
## construct list of args
args = ["samtools", "view", "-hb", "-L", bed_file_name, "-o", ins_regions_bam_name, psorted_bamfile_name]
# open subprocess
int_bed_reads_select = subprocess.Popen(args)
# wait till it finishes
outcode = int_bed_reads_select.wait()
#subprocess.call("samtools view -hb -L %s -o %s %s"%(bed_file_name,ins_regions_bam_name,psorted_bamfile_name),shell=True)
gc.collect()
if outcode == 0:
print "retrieving reads overlapping bed annots successful"
# construct list of args
args = ["samtools", "index", ins_regions_bam_name]
# open subprocess
int_bed_reads_index = subprocess.Popen(args)
# wait till it finishes
outcode = int_bed_reads_index.wait()
if outcode == 0:
print "indexing successful"
else:
print "indexing failed"
gc.collect()
else:
command = "\t".join(args)
print "retrieving reads overlapping bed annots failed! command: %s " % (command)
sys.exit(1)
insertion_regions_reads_bam = pysam.Samfile(ins_regions_bam_name, mode="rb")
######ATTEMPT TO SAVE MEMORY, reload all_clusters ####
print proc.get_memory_info().rss
# with open(output_prefix+'all_clusters.pkl', 'rb') as infile:
# all_clusters = pickle.load(infile)
all_clusters=list(load_pickle(output_prefix+'all_clusters.pkl'))
print proc.get_memory_info().rss
########################################################
for (cluster_pairs, fwd, rev, string) in all_clusters:
for cluster_pair in cluster_pairs:
try:
reads = insertion_regions_reads_bam.fetch(cluster_pair.get_chr(), cluster_pair.get_insertion_int_start(), cluster_pair.get_insertion_int_end())
cluster_pair.calc_zygosity(reads)
except:
print "error calculating zygosity of: "
print cluster_pair
raise
print "Done calculating zygosity of each cluster pair"
time = datetime.datetime.now()
print "elapsed time: " + str(time - start_time)
###################### print reads that were clustered to bam, output to gff and table ########################################
print "writing clustered reads to bam file, writing to gff and tables... "
# pickle_file = open(output_prefix + ".clusters_pairs.pickle", "w")
# pickle.dump(cluster_pairs, pickle_file)
# pickle_file.close()
#output_prefix_clusters = "%s.d%d" % (output_prefix, num_sdev)
##### this was to wrote final clustered reads to a separate bam file.
#input_bam = pysam.Samfile(psorted_bamfile_name, mode="rb")
#clustered_reads_bam_file = pysam.Samfile(output_prefix + ".final_clustered_reads.bam", mode="wb", referencenames=input_bam.references, referencelengths=input_bam.lengths)
#input_bam.close()
pair_gff_output_file = open(output_prefix + ".TE_insertions_paired_clusters.gff3", "w")
pair_table_output_file = open(output_prefix + ".TE_insertions_paired_clusters.supporting_clusters.table", "w")
pair_table_output_file.write(table_header(library_name, library_name, te_annot))
# if print_extra_output:
# single_gff_output_file = open(output_prefix + ".TE_insertions_single_cluster.gff3", "w")
# single_table_output_file = open(output_prefix + ".TE_insertions_single_cluster.supporting_clusters.table", "w")
# single_table_output_file.write(table_header(library_name, library_name, te_annot))
print len(all_clusters)
cluster_ID = 0
for (cluster_pairs, unpaired_fwd_clusters, unpaired_rev_clusters, strings) in all_clusters:
# comment this out b/c unpaired clusters are no longer reported
# if print_extra_output:
# for fwd_cluster in unpaired_fwd_clusters:
# single_gff_output_file.write(fwd_cluster.to_gff(cluster_ID, library_name, TE_name_tag) + "\n")
# single_table_output_file.write(fwd_cluster.to_table(cluster_ID, library_name))
# cluster_ID += 1
# for rev_cluster in unpaired_rev_clusters:
# single_gff_output_file.write(rev_cluster.to_gff(cluster_ID, library_name, TE_name_tag) + "\n")
# single_table_output_file.write(rev_cluster.to_table(cluster_ID, library_name))
# cluster_ID += 1
#print cluster_ID
for cluster_pair in cluster_pairs:
pair_gff_output_file.write(cluster_pair.to_gff(cluster_ID, library_name, TE_name_tag) + "\n")
pair_table_output_file.write(cluster_pair.to_table(cluster_ID, library_name))
cluster_ID += 1
#clustered_reads_bam_file.close()
time = datetime.datetime.now()
print "elapsed time: " + str(time - start_time)
pair_gff_output_file.close()
pair_table_output_file.close()
# if print_extra_output:
# single_gff_output_file.close()
# single_table_output_file.close()
end_time = str(datetime.datetime.now())
print "done! at " + end_time
run_stats = open(output_prefix + ".run_stats.txt", "w")
run_stats.write("lib\t%s\n" % (library_name))
run_stats.write("coverage\t%s\n" % (coverage))
run_stats.write("runtime\t%s\n" % ( datetime.datetime.now() - start_time))
run_stats.write("numCPUs\t%s\n" % (num_CPUs))
run_stats.close()
####### delete pickled file once all it's finished####
try:
os.unlink(output_prefix+'all_clusters.pkl')
except:
pass
def write_cluster_pairs_reads_to_bam(bam_file, cluster_pairs):
for cluster_pair in cluster_pairs:
read_pair_list = cluster_pair.fwd_cluster.readpair_list + cluster_pair.rev_cluster.readpair_list
print len(read_pair_list)
for read_pair in read_pair_list:
print "read pair"
print str(read_pair.read1)
print str(read_pair.read2)
bam_file.write(read_pair.read1)
bam_file.write(read_pair.read2)
def write_single_clusters_to_bam(bam_file, fwd_clusters, rev_clusters):
for cluster in fwd_clusters + rev_clusters:
for read_pair in cluster.readpair_list:
bam_file.write(read_pair.read1)
bam_file.write(read_pair.read2)