-
Notifications
You must be signed in to change notification settings - Fork 1
/
BGCtoolkit.py
executable file
·1696 lines (1447 loc) · 70.7 KB
/
BGCtoolkit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
Handle Biosynthetic Gene Cluster objects
- Input:
* .gb/.gbk
* .bgc/.bgccase
* .fasta
- Filter:
* Include string
* Exclude string
* The user's own list (bgclist)
- Processing:
* Domain prediction
* Merge split domains
- Output:
* Figures (individual or stacked)
* BGC objects (.bgc or .bgccase)
* Metadata (list of protein ids)
* Sequences (core biosynthetic proteins + selected domains)
TODO:
* Convert fasta files into .bgcprotein or .bgcproteincase
"""
import os
import sys
from shutil import copyfile
from typing import Tuple
import json
import argparse
import pickle
from collections import defaultdict
from itertools import chain
# import shelve
from lxml import etree
from pathlib import Path
from multiprocessing import cpu_count
from BGClib import HMM_DB, BGC, BGCLocus, BGCCollection, ProteinCollection, \
ArrowerOpts, BGCDomain, valid_CBP_types, valid_CBP_types_antiSMASH, \
valid_CBP_types_fungal
from datetime import datetime
__author__ = "Jorge Navarro"
__version__ = "0.3.7"
__maintainer__ = "Jorge Navarro"
__email__ = "j.navarro@wi.knaw.nl"
# Prepare arguments for the script
def CMD_parser():
parser = argparse.ArgumentParser(description="BGC toolkit v{}. Tools to \
facilitate biosynthetic gene cluster handling and \
visualization.".format(__version__))
group_input = parser.add_argument_group("Input")
# TODO: change to --inputfoldersgbk? (and add --inputfoldersbgc?)
group_input.add_argument("-i", "--inputfolders", nargs='+', type=Path, \
help="Folder(s) to search (recursively) for .gb, .gbk and .gbff files.\
If the file starts with generic terms ('scaffold', 'contig', \
'Supercontig'), the parent folder's name will be used in the internal \
BGC name for that file.")
group_input.add_argument("-f", "--files", nargs='+', type=Path, \
help="Input individual files (accepted: .gb .gbk, .bgc, .bgccase, \
.fasta, .proteincase).")
group_filtering = parser.add_argument_group(title="Filtering",
description="Include or exclude BGCs based on their names. Note:\
for .bgc and .bgccase files, inclusion rules by --include, --exclude\
and --bgclist will be applied to the internal BGC identifier, not to \
the name of the file.")
default_include = ['region', 'cluster']
group_filtering.add_argument("--include", nargs='*', default=default_include,
help=f"Specify string(s) to filter which BGCs will be included. In the \
case of .gb or .gbk files, the filter is applied to the filename. For \
data stored as .bgc or .bgccase files, the filter is applied to the \
BGC(s) identifier. If the option is present but no arguments are \
given, the filter will be ignored (all BGCs will be included). If the \
argument is not present, the default is to use the strings: {default_include}",
type=str)
group_filtering.add_argument("--exclude", nargs='*', default=['final'],
help="Specify string(s) to filter which BGCs will be rejected. \
Similar rules are applied as with --include. If the argument is not \
present, the default is to use 'final'.", type=str)
group_filtering.add_argument("-l", "--bgclist", help="A tab-separated file \
containing a list of BGC names (first column) and protein names (second\
column). The BGC names filter input from --files and --inputfolder. If \
only the protein column is present, it will be used to filter input from \
.proteincase or .fasta files. If used with --svg and --stacked, the \
contents of this file also define the order (and protein to align) in \
the final SVG figure. Any extra columns or rows starting with the '#' \
character will be ignored.", type=Path)
group_processing = parser.add_argument_group("Domain annotation options")
group_processing.add_argument("--hmms", nargs='*', help="List of paths to \
.hmm file(s). This will also enable internal hmm models (use without \
arguments to only use internal models).")
group_processing.add_argument("--update_domains", help="Annotate domains, \
even if input already has annotations. \
If new and old hits overlap, only the best-scoring hit will be kept.", \
default=False, action="store_true")
group_processing.add_argument("--clear_domains", help="Clean all domain data \
from input before domain prediction.", default=False, action="store_true")
group_processing.add_argument("-c", "--cpus", type=int, default=cpu_count(),
help="Number of CPUs used for domain prediction. Default: all available.")
internal_alias_file = Path(__file__).parent / "BGClib" / "data" / "CBP_domains.tsv"
group_processing.add_argument("--alias", type=Path, help=f"Argument is a \
tab-separated file with domain alias. First column: domain ID, Second \
column: alias. If domain prediction is used, this information will also \
be stored (e.g. to use afterwards when making SVG figures). Default \
aliases are stored in the special file {internal_alias_file}", \
dest='alias_file')
group_processing.add_argument("--fungal_cbp", help="Uses internal \
domain-based rules to re-annotate all core biosynthetic protein types. \
Intended to be used only on fungal sets.", default=False, \
action="store_true")
group_post_proc = parser.add_argument_group("Post-processing options")
group_post_proc.add_argument("--merge", default=False, action="store_true",
help="Try to fix successive domains of the same type that have been \
split. This could be useful for phylogenetic analysis but will not \
fix incorrect gene predictions. Use with care.")
# TODO:
# group_post_proc.add_argument("--jsonfolders", nargs='+', type=Path, \
# help="Use MIBiG-style JSON files to annotate metadata (organism, \
# metabolites, literature)")
group_output = parser.add_argument_group(title="Output",
description="Basic output options")
default_output = Path("./") / "output"
group_output.add_argument("-o", "--outputfolder",
default=default_output, help=f"Base folder where \
results will be put (default='{default_output}').")
group_output.add_argument("--metadata", type=str, help="Writes \
information files at three levels: a summary of the whole collection, \
a summary per BGC (CBP content) and a summary per core protein (domain \
organization). Argument is the basename of these files (no extension).\
Activated by default if --bgccase is used.")
group_svg = parser.add_argument_group("SVG options")
group_svg.add_argument("--svg", default=False,
action="store_true", help="Toggle to enable SVG output for each BGC.")
default_svg_cfg = Path(__file__).parent/"SVG_arrow_options.cfg"
group_svg.add_argument("--svgcfg", type=Path, default=default_svg_cfg,
help=f"Configuration file with SVG style. Default: \
'{default_svg_cfg}'")
group_svg.add_argument("-m", "--mirror", default=False,
action="store_true", help="Toggle to mirror each BGC figure. Ignored \
with --stacked or --bgclist")
group_svg.add_argument("-s", "--stacked", type=str, help="If used with \
--svg, all BGC SVGs will be put in the same figure. The argument of \
this parameter is the filename (no extension).")
# NOTE: if we have more than one comparison method, use
# "choices=['rock', 'paper', 'scissors']"
# group_svg.add_argument("--comparative", default=False, action="store_true",
# help="If --stacked and --bgclist are used, calculate protein similarity \
# between each pair of BGCs and represent it as connecting bands.")
group_svg.add_argument("--gaps", default=False, action="store_true",
help="If --stacked is used, toggle this option to leave gaps\
when a particular BGC or protein is not found in the input data \
(--bgclist).")
group_organize = parser.add_argument_group(title="Organize biosynthetic output", \
description="(Optional) Use these parameters with the remaining output \
options to separate data into different sub-folders according to \
selected Core Biosynthetic Type(s) (CBTs). If any of these options are \
used, the final biosynthetic types used will be those common between \
a) the types in the dataset, b) the ones in the cbt file and c) the\
ones in the cbt-include list, minus those from the cbt-exclude option.\
Currently supported types from antiSMASH: {}. \nCurrently supported \
fungal types (these will need options --hmms and --fungal_cbp): {}.".format(\
", ".join(["'{}'".format(x) for x in sorted(valid_CBP_types_antiSMASH.keys())]), \
", ".join(["'{}'".format(x) for x in sorted(valid_CBP_types_fungal.keys())])))
group_organize.add_argument("--cbt-file", type=Path, nargs='?',
const=(Path(__file__).parent/"CBP_output_types.cfg"), \
help="A file with valid core biosynthetic types. If no argument \
is given, the file 'CBP_output_types.cfg' will be read.")
# TODO explain alias
group_organize.add_argument("--cbt-include", type=str, nargs='*', \
help="A space-separated list of core biosynthetic types to use when \
organizing output. If 'all' or not argument is given, all posible CBTs \
will be used. Additionally for this parameter, it is possible to \
search for specific domains (e.g. 'all:C' or 'nrPKS:SAT'). Domains \
may be the model's ID, the stored alias, or the alias given in the \
file specified with --alias")
group_organize.add_argument("--cbt-exclude", type=str, nargs='+', \
help="A space-separated list of all the CBTs to exclude.")
group_file_output = parser.add_argument_group(title="File output", \
description="These options will trigger metadata output")
group_file_output.add_argument("--bgc", default=False,
action="store_true", help="Toggle to save binary (.bgc) files of each \
BGC (from .bgc, .bgccase and GenBank files). Data from fasta input \
files will not be included")
group_file_output.add_argument("--bgccase", type=str, help="Output a single \
binary file with all BGCs (.bgccase). The argument of this parameter \
is the filename (no extension). Data from fasta input files will not \
be included.")
group_file_output.add_argument("-p", "--proteincase", type=str, \
help="Output .proteincase files for each type of Core Biosynthetic\
Protein type specified in the file 'CBP_output_types.cfg'. The \
argument of this parameter is the basename for all .proteincase \
files")
group_file_output.add_argument("--genbank", default=False, \
action="store_true", help="Toggle to create GenBank files. Currently, \
only works for gbk input files. Use with 'cbt' options to separate by\
biosynthetic type.")
group_organize.add_argument("--cbt-fasta", default=False, action="store_true",
help="Toggle to output sequences of (specialized metabolite's) Core \
Biosynthetic Proteins for each CBT defined in the \
'CBP_output_types.cfg' file. Only available when cbt* options are \
enabled.")
return parser.parse_args()
def sanitize(filename: str) -> str:
"""
Remove illegal characters of strings that are intended to be used as filenames
If found, illegal characters will be replaced by underscores
"""
illegal = r"/|\:<>*"
sanitized_filename = filename
for c in illegal:
sanitized_filename = sanitized_filename.replace(c, "_")
return sanitized_filename
# Basic validation of input files
def check_parameters(args) -> None:
"""
Checkes whether parameters are valid and paths exist
"""
inputfiles = args.files
inputfolders = args.inputfolders
# Stop if no input was specified
if not inputfiles and not inputfolders:
sys.exit("\nStop: no input data. See options using -h")
# Stop if no output was specified
if not (args.svg or args.bgc or args.bgccase or args.proteincase \
or args.metadata or args.cbt_fasta or args.genbank):
sys.exit("\nStop: no output options were selected (use --svg, --bgc, etc.)")
if args.cpus < 1:
sys.exit("Error: parameter --cpus must be a positive integer")
if args.cbt_file:
if not args.cbt_file.is_file():
sys.exit("Error (--cbt-file): cannot open specified file")
if args.alias_file:
if not args.alias_file.is_file():
sys.exit("Error (--alias): cannot open specified file")
if inputfiles:
for file_ in inputfiles:
f = Path(file_)
if not f.is_file():
sys.exit(f"Error (--files): {f} is not a file")
# if f.suffix == ".bgccase":
# try:
# s = shelve.open(str(f), 'r')
# except:
# sys.exit("Error (--files): {} not a valid bgccase database".format(f))
# else:
# s.close()
# else:
# if not f.is_file():
# sys.exit("Error (--files): {} is not a file".format(f))
if inputfolders:
for folder in inputfolders:
if not Path(folder).is_dir():
sys.exit(f"Error (--inputfolders): {folder} is not a folder")
if args.hmms:
for hmm in args.hmms:
hmm_file = Path(hmm)
if not hmm_file.is_file():
sys.exit(f"Error (--hmm): {hmm_file} is not a file")
if hmm_file.suffix not in {".hmm", ".HMM"}:
sys.exit(f"Error (--hmm): {hmm_file} does not have a .hmm extension")
if args.bgclist:
if not Path(args.bgclist).is_file():
sys.exit(f"Error: (--bgclist): {args.bgclist} is not a file")
# Style options
if args.svg:
if not args.svgcfg.is_file():
sys.exit(f"Error (--svgcfg): {args.svgcfg} is not a file")
if args.metadata and args.bgccase:
print("Warning: --metadata and --bgccase. The argument of --metadata will be ignored")
# Input stage
def valid_name(name: str, include: list, exclude: list, filter_bgc: list) -> bool:
"""
Checks whether a filename is valid and should be included in the analysis
based on the allowed strings (args.include) or strings to be avoided
(args.exclude) as well as the criterium that the BGC is included in the
filter list.
It is expected that the parameter 'name' is the name of the BGC or the file,
without extension
"""
if include:
if not any([word in name for word in include]):
return False
if exclude:
if exclude != [] and any(word in name for word in exclude):
return False
if filter_bgc:
if name not in filter_bgc:
return False
return True
def read_bgc_list(bgclist: Path) -> list:
"""
Reads the bgclist file
* Tab-separated file
* First column: bgc id. Can be empty
* Second column: protein identifier or protein_id. Can be empty
* Third column: comment (or any line starting with #)
"""
filterlist = list()
with open(bgclist) as f:
for line in f:
if line[0] == "#" or line.strip() == "":
continue
xline = line.split("\t")
if len(xline) > 1:
# assume second column is protein_id
bgc_id = xline[0]
protein_id = xline[1]
filterlist.append((bgc_id.strip(), protein_id.strip()))
else:
filterlist.append((xline[0], ""))
if len(filterlist) == 0:
sys.exit("Error: filter BGC list given but the file is empty...")
return filterlist
def read_alias_file(alias_file: Path) -> dict:
external_alias = dict()
with open(alias_file) as a:
for line in a:
if line[0] == "#" or line.strip() == "":
continue
xline = line.strip().split("\t")
try:
ID, alias = xline[0], xline[1]
except IndexError:
# there should be at least two columns, separated by tabs
continue
else:
external_alias[ID] = alias
return external_alias
def get_files(inputfolders: list, files: list, include: list, exclude: list, filter_bgc_prot: list) -> Tuple[BGCCollection, ProteinCollection]:
"""
Collect all data from the input, using filters defined by user to produce
the collection of data to work with.
"""
# If these words are found at the beginning of the filename (which can
# typically happen in generic antiSMASH output), append the parent folder's
# name to the BGC identifier
forbidden_words = {'scaffold', 'contig', 'Supercontig', 'c00'}
bgc_col = BGCCollection()
prot_col = ProteinCollection()
gbk_files = dict()
all_bgcs = defaultdict(list) # includes repeated bgc ids
# Build filter sets for both BGCs and proteins
filter_bgc_set = set()
filter_prot = set()
for bgc_id, protein_id in filter_bgc_prot:
if bgc_id != "":
filter_bgc_set.add(bgc_id)
else:
filter_prot.add(protein_id)
if inputfolders:
for folder in inputfolders:
for gbk_file in chain(folder.glob("**/*.gb"), folder.glob("**/*.gbk")):
if valid_name(gbk_file.stem, include, exclude, filter_bgc_set):
identifier = gbk_file.stem
if any([identifier.startswith(fword) for fword in forbidden_words]):
identifier = f"{gbk_file.parts[-2]}.{identifier}"
gbk_files[identifier] = gbk_file
all_bgcs[identifier].append(gbk_file)
if files:
for f in files:
if f.suffix.lower() in {".gb", ".gbk", ".gbff"}:
if valid_name(f.stem, include, exclude, filter_bgc_set):
identifier = f.stem
if any([identifier.startswith(fword) for fword in forbidden_words]):
identifier = f"{f.resolve().parts[-2]}.{identifier}"
gbk_files[identifier] = f
all_bgcs[identifier].append(f)
elif f.suffix.lower() == ".bgc":
with open(f, "rb") as dc:
bgc = pickle.load(dc)
# make sure it's a valid bgc:
assert(isinstance(bgc, BGC))
bgc_id = bgc.identifier
if valid_name(bgc_id, include, exclude, filter_bgc_set):
bgc_col.bgcs[bgc_id] = bgc
all_bgcs[bgc_id].append(f)
if bgc_id in gbk_files:
print(f"\tWarning: BGC {bgc_id} has been found in a .gbk/.gb file and a .bgc file (using the latter)")
del gbk_files[bgc_id]
elif f.suffix.lower() == ".bgccase":
# TODO: fix shelves to always use the same backend or find
# another solution
# with shelve.open(str(f), flag='r') as col:
with open(f, "rb") as dc:
col = pickle.load(dc)
# make sure it's a valid collection
assert(isinstance(col, BGCCollection))
# if we've got a filter list, it's faster to use it
if filter_bgc_set:
for bgc_id in filter_bgc_set:
try:
bgc = col.bgcs[bgc_id]
except KeyError:
pass
else:
# still have to check include and exclude filters
if valid_name(bgc_id, include, exclude, set()):
bgc_col.bgcs[bgc_id] = bgc
all_bgcs[bgc_id].append(f)
# if we already had it in a gbk file, use this version instead
if bgc_id in gbk_files:
print(f"\tWarning: BGC {bgc_id} has been found in a .gbk/.gb file and a .bgccase collection (using the latter)")
del gbk_files[bgc_id]
else:
for bgc_id, bgc in col.bgcs.items():
if valid_name(bgc_id, include, exclude, filter_bgc_set):
bgc_col.bgcs[bgc_id] = bgc
all_bgcs[bgc_id].append(f)
# if we already had it in a gbk file, use this version instead
if bgc_id in gbk_files:
print(f"\tWarning: BGC {bgc_id} has been found in a .gbk/.gb file and a .bgccase collection (using the latter)")
del gbk_files[bgc_id]
elif f.suffix.lower() == ".fasta":
prot_col.fasta_load(f)
elif f.suffix.lower() == ".proteincase":
with open(f, 'rb') as pc:
temp_prot_col = pickle.load(pc)
assert(isinstance(temp_prot_col, ProteinCollection))
if temp_prot_col.proteins.keys() & prot_col.proteins.keys():
print("\tWarning! Over-writing proteins with the same header. Make sure you have unique headers in all your protein inputs. This will probably result in unintended results")
prot_col.proteins.update(temp_prot_col)
prot_col.proteins.update(temp_prot_col.proteins)
else:
print(f"\tWarning: unknown format ({f})")
# Applying filtering to the protein collection. This is tricky because
# the filtering may include protein_ids, so we have to traverse the
# whole collection. Also, more practical to do it with all the proteins
# already in a collection (as opposed to filtering-while-adding-them as
# with BGCs)
if filter_prot:
final_prot_col = ProteinCollection()
for protein in prot_col.proteins.values():
if protein.identifier in filter_prot or protein.protein_id in filter_prot:
final_prot_col.proteins[protein.identifier] = protein
else:
final_prot_col = prot_col
# Report repeated BGCs
print_warning_repeated = False
for bgc_id, file_list in all_bgcs.items():
if len(file_list) > 1:
print_warning_repeated = True
break
if print_warning_repeated:
rbl_file = o / "repeated_bgc_locations.txt"
print(f"\tWarning: multiple files for the same BGC were found. See file {rbl_file}")
with open(rbl_file, "w") as rbl:
for bgc_id, file_list in all_bgcs.items():
if len(file_list) > 1:
rbl.write(f"{bgc_id}\n")
for f in file_list:
rbl.write(f"\t{f}\n")
rbl.write("\n")
# Finally, add the gbk files to the BGC collection
for identifier, gbk_path in gbk_files.items():
bgc_col.add_gbk(gbk_path, identifier)
return bgc_col, final_prot_col, gbk_files
def predict_domains(clear_domains:bool, update_domains:bool, hmmdbs: HMM_DB, bgc_collection: BGCCollection, protein_collection: ProteinCollection):
"""
Runs domain prediction on BGCs and Proteins depending or parameters
Note that we're using their 'attempted_domain_prediction' attribute
as opposed to using the ones that currently don't have any domain
predicted
"""
bgc_collection_need_dom_pred = BGCCollection()
protein_collection_need_dom_pred = ProteinCollection()
# if clear_domains: clear all data first, re-predict domains
# Notice that the protein role and type attributes are NOT cleared
if clear_domains or update_domains:
# Need to convert generator to list b/c we're removing items in the loop
for bgc_id in list(bgc_collection.bgcs.keys()):
if args.clear_domains:
bgc_collection.bgcs[bgc_id].clear_domain_predictions()
bgc_collection_need_dom_pred.bgcs[bgc_id] = bgc_collection.bgcs.pop(bgc_id)
for protein_identifier in list(protein_collection.proteins.keys()):
if args.clear_domains:
protein_collection.proteins[protein_identifier].clear_domain_predictions()
protein_collection_need_dom_pred.proteins[protein_identifier] = protein_collection.proteins.pop(protein_identifier)
# nothing selected: only predict domains for "clean" objects
else:
for bgc_id in list(bgc_collection.bgcs.keys()):
if not bgc_collection.bgcs[bgc_id].attempted_domain_prediction:
bgc_collection_need_dom_pred.bgcs[bgc_id] = bgc_collection.bgcs.pop(bgc_id)
for protein_identifier in list(protein_collection.proteins.keys()):
if not protein_collection.proteins[protein_identifier].attempted_domain_prediction:
protein_collection_need_dom_pred.proteins[protein_identifier] = protein_collection.proteins.pop(protein_identifier)
print("\tPredicting domains...")
bgc_collection_need_dom_pred.predict_domains(hmmdbs, cpus=hmmdbs.cores, tc=True)
protein_collection_need_dom_pred.predict_domains(hmmdbs, cpus=hmmdbs.cores, tc=True)
print("\t...done!\n")
# Finally, the data we'll be processing
bgc_collection.bgcs.update(bgc_collection_need_dom_pred.bgcs)
protein_collection.proteins.update(protein_collection_need_dom_pred.proteins)
return
# Post-processing step
def fix_core_split_domains(bgc_col, prot_col):
"""
Finds repeated domains and tries to merge them in a single one
Currently for domains found in core biosynthetic proteins
"""
# key: BGCProtein.
# Value: list whose elements are themselves lists of consecutive domain IDs
protein_to_broken_doms = defaultdict(list)
# collect data
for bgc in bgc_col.bgcs.values():
for cbp_type in bgc.CBPcontent:
for protein in bgc.CBPcontent[cbp_type]:
if not protein.domain_list:
continue
consecutive_domain_list = [] # consecutive dom objs relative to hmm
current_id = ""
for current_dom_num in range(len(protein.domain_list)-1):
current_dom = protein.domain_list[current_dom_num]
if current_id != current_dom.ID:
if consecutive_domain_list:
# a new type of dom was found; append list of fragments
protein_to_broken_doms[protein].append(consecutive_domain_list)
consecutive_domain_list = []
# look ahead to see if we need to start a new list
next_dom = protein.domain_list[current_dom_num+1]
if current_dom.ID == next_dom.ID and \
current_dom.hmm_to < next_dom.hmm_from:
consecutive_domain_list.append(current_dom)
current_id = current_dom.ID
# Current and previous domains are the same type. Check coords.
elif consecutive_domain_list:
if consecutive_domain_list[-1].hmm_to < current_dom.hmm_from:
consecutive_domain_list.append(current_dom)
# check last domain
current_dom = protein.domain_list[-1]
if current_id != current_dom.ID:
if consecutive_domain_list:
protein_to_broken_doms[protein].append(consecutive_domain_list)
# could have same ID but hmm coordinates don't indicate splitting
elif consecutive_domain_list:
if consecutive_domain_list[-1].hmm_to < current_dom.hmm_from:
consecutive_domain_list.append(current_dom)
protein_to_broken_doms[protein].append(consecutive_domain_list)
# check now proteins from Protein Collection
for protein in prot_col.proteins.values():
if not protein.role == "biosynthetic":
continue
consecutive_domain_list = [] # consecutive dom objs relative to hmm
current_id = ""
for current_dom_num in range(len(protein.domain_list)-1):
current_dom = protein.domain_list[current_dom_num]
if current_id != current_dom.ID:
if consecutive_domain_list:
# a new type of dom was found; append list of fragments
protein_to_broken_doms[protein].append(consecutive_domain_list)
consecutive_domain_list = []
# look ahead to see if we need to start a new list
next_dom = protein.domain_list[current_dom_num+1]
if current_dom.ID == next_dom.ID and \
current_dom.hmm_to < next_dom.hmm_from:
consecutive_domain_list.append(current_dom)
current_id = current_dom.ID
# Current and previous domains are the same type. Check coords.
elif consecutive_domain_list:
if consecutive_domain_list[-1].hmm_to < current_dom.hmm_from:
consecutive_domain_list.append(current_dom)
# check last domain
current_dom = protein.domain_list[-1]
if current_id != current_dom.ID:
if consecutive_domain_list:
protein_to_broken_doms[protein].append(consecutive_domain_list)
# could have same ID but hmm coordinates don't indicate splitting
elif consecutive_domain_list:
if consecutive_domain_list[-1].hmm_to < current_dom.hmm_from:
consecutive_domain_list.append(current_dom)
protein_to_broken_doms[protein].append(consecutive_domain_list)
# try to stitch fragments
for protein in protein_to_broken_doms:
# print(protein.identifier)
for consecutive_domain_list in protein_to_broken_doms[protein]:
# for d in consecutive_domain_list:
# print("{}\t{}\t{}\t{}\t{}\t{}".format(protein.protein_id, \
# d.ID, d.hmm_from, d.hmm_to, d.ali_from, d.ali_to))
# print(d.get_sequence())
# print("")
first_dom = consecutive_domain_list[0]
last_dom = consecutive_domain_list[-1]
# Warning! Score of the new domain is naively asssigned to be the
# simple sum of the scores of its parts
merged_domain = BGCDomain(protein, first_dom.ID, \
first_dom.AC, first_dom.DE, first_dom.alias, \
first_dom.ali_from, last_dom.ali_to, first_dom.hmm_from, \
last_dom.hmm_to, 0, \
sum(d.score for d in consecutive_domain_list), 0, "")
for d in consecutive_domain_list:
protein.domain_list.remove(d)
protein.domain_list.append(merged_domain)
protein.domain_list.sort(key=lambda x:x.ali_from)
# Output: svg options
def draw_svg_individual(
o: Path,
bgc_col: BGCCollection,
prot_col: ProteinCollection,
filter_bgc_prot: list,
hmmdbs: HMM_DB,
svgopts: ArrowerOpts,
mirror: bool
) -> None:
"""
Draw individual SVGs per BGC or Protein
"""
filter_bgc = dict()
filter_prot = set()
for bgc_id, protein_id in filter_bgc_prot:
if bgc_id != "":
filter_bgc[bgc_id] = protein_id
else:
filter_prot.add(protein_id)
# BGCs
filtered_bgc_col = BGCCollection()
# Use filter_bgc (if needed) to build the collection with bgcs we need to visualize
if filter_bgc:
for bgc_id in (filter_bgc.keys() & bgc_col.bgcs.keys()):
filtered_bgc_col.bgcs[bgc_id] = bgc_col.bgcs[bgc_id]
else:
filtered_bgc_col = bgc_col
for bgc_id, bgc in filtered_bgc_col.bgcs.items():
m = mirror
# see if we need to mirror the cluster
if bgc_id in filter_bgc:
pid = filter_bgc[bgc_id]
if pid != "":
for p in bgc.protein_list:
if pid == p.protein_id or pid == p.identifier:
if not p.forward:
m = True
break
coregenearch = ""
if len(bgc.CBPtypes) > 0:
coregenearch = f"_[{','.join(bgc.CBPtypes)}]"
m_info = ""
if m:
m_info = "_m"
bgc_name = o / f"{bgc_id}{coregenearch}{m_info}.svg"
bgc.BGC_SVG(bgc_name, hmmdbs, svgopts, mirror=m)
# Proteins. Id can be protein_id or identifier
filtered_prot_col = ProteinCollection()
if filter_prot:
for protein in prot_col.proteins.values:
if protein.identifier in filter_prot or protein.protein_id in filter_prot:
filtered_prot_col.proteins[protein.identifier] = protein
else:
filtered_prot_col = prot_col
for pid, protein in filtered_prot_col.proteins.items():
cbp_arch = ""
if protein.role == "biosynthetic":
cbp_arch = f"_[{protein.protein_type}]"
protein_name = o / f"{sanitize(pid)}{cbp_arch}.svg"
protein.arrow_SVG(protein_name, hmmdbs, svgopts)
return
def draw_svg_stacked(
o: Path,
stacked_name: str,
bgc_col: BGCCollection,
prot_col: ProteinCollection,
filter_bgc_prot: list,
hmmdbs: HMM_DB,
svgopts: ArrowerOpts,
gaps: bool,
warning: bool=False
) -> None:
"""
Draws a stacked BGC figure
"""
# Discard "trivial" case of not having an ordered list of BGCs/proteins
if len(filter_bgc_prot) == 0:
return draw_svg_stacked_simple(o, stacked_name, bgc_col, prot_col, \
hmmdbs, svgopts)
filename = o / f"{stacked_name}.svg"
thickness = svgopts.gene_contour_thickness
# Read extra info for comparative fig.
# if args.comparative:
# comparative_figure_spacing = 1.0
# with open(args.svgcfg) as f:
# for line in f:
# if line.startswith("comparative_figure_spacing"):
# try:
# comparative_figure_spacing = float(line.split("=")[1].strip())
# except:
# pass
# else:
# if comparative_figure_spacing < 0:
# comparative_figure_spacing = 1.0
# break
# Read original filtering options and make two lists: one with BGCs and
# the other with the internal protein identifier of reference for
# horizontal alignment. Stand-alone proteins will be converted to
# pseudo-BGCs for the figure
draw_order_bgcs = list()
scaling = svgopts.scaling
H = svgopts.arrow_height # used for loci spacing
bgc_lengths = dict()
needs_mirroring = list()
bgc_distance_to_target = list()
start_to_target_max_offset = 0
for bgc_id, pid in filter_bgc_prot:
if bgc_id != "":
if bgc_id not in bgc_col.bgcs:
if warning:
print(f"\tSVG (stacked, bgclist): --bgclist used but {bgc_id} not found in BGC data")
draw_order_bgcs.append(None)
continue
bgc = bgc_col.bgcs[bgc_id]
draw_order_bgcs.append(bgc)
# second term in the addition: inter-loci spacing element
L = sum([locus.length/scaling for locus in bgc.loci]) \
+ H * (len(bgc.loci)-1) \
+ thickness
bgc_lengths[bgc_id] = L
if pid == "":
if warning:
print(f"\tSVG (stacked, bgclist): Warning, {bgc_id} has no reference Protein Id")
needs_mirroring.append(False)
bgc_distance_to_target.append(-1)
continue
protein = None
for locus_num, locus in enumerate(bgc.loci):
for candidate_protein in locus.protein_list:
if pid == candidate_protein.protein_id \
or pid == candidate_protein.identifier:
needs_mirroring.append(not candidate_protein.forward)
protein = candidate_protein
# get start_to_target distance
if protein.forward:
# lenghts of each loci until target +
# inter-loci spacing + distance of current \
# locus to target_protein
start_to_target = \
sum([locus.length/scaling for locus in bgc.loci[:locus_num]]) \
+ H * locus_num \
+ protein.cds_regions[0][0]/scaling
bgc_distance_to_target.append(start_to_target)
else:
start_to_target = \
sum([locus.length/scaling for locus in bgc.loci[locus_num+1:]]) \
+ H * (len(bgc.loci) - locus_num - 1) \
+ (locus.length - protein.cds_regions[-1][1])/scaling
bgc_distance_to_target.append(start_to_target)
if start_to_target > start_to_target_max_offset:
start_to_target_max_offset = start_to_target
break
# Couldn't find protein specified by user; typo?
if protein is None:
print(f"\tSVG (stacked, bgclist): Warning, cannot find reference Protein Id [{pid}] for {bgc_id}")
needs_mirroring.append(False)
bgc_distance_to_target.append(-1)
continue
# got stand-alone protein, convert to BGC
else:
protein = None
try:
# Easy case: given pid is internal identifier
protein = prot_col.proteins[pid]
except KeyError:
# then pid come from annotations? look for protein_id
for protein_id, candidate_protein in prot_col.proteins.items():
if protein_id == pid:
protein = candidate_protein
break
if protein is None:
print(f"\tSVG (stacked, bgclist): Warning, stand-alone protein {pid} not found")
draw_order_bgcs.append(None)
continue
L = (protein.cds_regions[-1][1] - protein.cds_regions[0][0])/scaling + thickness
# Create fake BGC
locus = BGCLocus()
locus.length = L
locus.protein_list.append(protein)
locus.gene_coordinates.append((protein.cds_regions[0][0], protein.cds_regions[-1][1]))
bgc = BGC()
bgc.identifier = protein.identifier
bgc.loci.append(locus)
bgc.protein_list.append(protein)
bgc.proteins[protein.identifier] = protein
draw_order_bgcs.append(bgc)
bgc_lengths[bgc.identifier] = L
needs_mirroring.append(False)
bgc_distance_to_target.append(0)
# obtain max_L considering all the starting offsets
max_L = 0
for distance in bgc_distance_to_target:
if distance == -1:
max_L = max(max_L, bgc_lengths[bgc_id])
else:
max_L = max(max_L, bgc_lengths[bgc_id] \
+ start_to_target_max_offset \
- distance)
# Start SVG internal structure
row_height = 2*svgopts.arrow_height # one for the arrow, 0.5 + 0.5 for the head height
inner_row_height = row_height + thickness
base_attribs = {"version":"1.1",
"baseProfile":"full",
"width":str(int(max_L))
}
root = etree.Element("svg", attrib=base_attribs, nsmap={None:'http://www.w3.org/2000/svg'})
# Add each figure
Yoffset = 0
rows = 0
for bgc_num, item in enumerate(draw_order_bgcs):
Yoffset = rows * inner_row_height
if item is None:
if gaps:
rows += 1
continue
else:
bgc = item
bgc_id = bgc.identifier
# Marked BGCs with no reference Protein Id won't have offset
Xoffset = 0
if bgc_distance_to_target[bgc_num] != -1:
Xoffset = start_to_target_max_offset - bgc_distance_to_target[bgc_num]
root.append(bgc.xml_BGC(Xoffset, Yoffset, hmmdbs, svgopts, needs_mirroring[bgc_num]))
rows += 1
#Yoffset = rows * inner_bgc_height
# Now that we now how many BGCs were actually drawn, add height property
root.attrib["height"] = str(int(thickness + rows*(row_height+thickness)))
# Write SVG
with open(filename, "bw") as f:
f.write(etree.tostring(root, pretty_print=True))
def draw_svg_stacked_simple(
o: Path,
stacked_name: str,
bgc_col: BGCCollection,
prot_col: ProteinCollection,
hmmdbs: HMM_DB,
svgopts: ArrowerOpts
) -> None:
"""
Draws a stacked BGC figure.
In this simple version, no order or alignment are given; all BGCs, then
proteins, will be drawn in arbitrary order
"""
filename = o / f"{stacked_name}.svg"
thickness = svgopts.gene_contour_thickness
# Now obtain a list of protein objects that will be used for alignment
# Also get information about mirroring and offset distancing
scaling = svgopts.scaling
H = svgopts.arrow_height # used for loci spacing
# find max_L
max_L = 0
for bgc in bgc_col.bgcs.values():
L = sum([locus.length/scaling for locus in bgc.loci]) \
+ H * (len(bgc.loci)-1) \
+ thickness
if L > max_L:
max_L = L
for protein in prot_col.proteins.values():
L = (protein.cds_regions[-1][1] - protein.cds_regions[0][0])/scaling + thickness
if L > max_L:
max_L = L
# Start SVG internal structure
row_height = 2*svgopts.arrow_height # one for the arrow, 0.5 + 0.5 for the head height
inner_row_height = row_height + thickness
base_attribs = {"version":"1.1",
"baseProfile":"full",
"width":str(int(max_L))