From 4ad77dd555e98ddfbbc612d6d53ad12d8106293a Mon Sep 17 00:00:00 2001 From: Jake Bradford Date: Mon, 28 Mar 2022 11:35:05 +1000 Subject: [PATCH] bug: when merging the sorted off-target sites files an OS error could be thrown if too many file handlers are open. - I wrote a fix that merges files in batches. - I also changed how CLI arguments are parsed (from using `sys` to `argparse`). This also provides help documentation to the user --- src/crackling/utils/extractOfftargets.py | 140 ++++++++++++++--------- 1 file changed, 85 insertions(+), 55 deletions(-) diff --git a/src/crackling/utils/extractOfftargets.py b/src/crackling/utils/extractOfftargets.py index 74bb39f..a96dd79 100644 --- a/src/crackling/utils/extractOfftargets.py +++ b/src/crackling/utils/extractOfftargets.py @@ -15,7 +15,7 @@ ''' -import glob, multiprocessing, os, re, shutil, string, sys, tempfile, heapq +import glob, multiprocessing, os, re, shutil, sys, tempfile, heapq, argparse from crackling.Helpers import * from crackling.Paginator import Paginator @@ -23,19 +23,6 @@ pattern_forward_offsite = r"(?=([ACG][ACGT]{19}[ACGT][AG]G))" pattern_reverse_offsite = r"(?=(C[CT][ACGT][ACGT]{19}[TGC]))" -# The off-target sites need to be sorted so the ISSL index is space-optimised. -# In some cases, there are many files to sort, we can paginate the files. -# How many files should we sort per page? -# This is dictated by the number of arguments that we can pass to `sort` -# Default: 50 -SORT_PAGE_SIZE = 500 - -# Set the number of processes to generate. -# This sets the --threads argument for `sort`, and -# the process count for multiprocessing.Map(..) -# Default: os.cpu_count() -PROCESSES_COUNT = os.cpu_count() - def explodeMultiFastaFile(fpInput, fpOutputTempDir): newFilesPaths = [] @@ -103,7 +90,6 @@ def processingNode(fpInputs, fpOutputTempDir = None): seqsByHeader[header].append(line.rstrip().upper()) # For each FASTA sequence - lineNumber = 0 offtargets = [] for header in seqsByHeader: seq = ''.join(seqsByHeader[header]) @@ -118,10 +104,9 @@ def processingNode(fpInputs, fpOutputTempDir = None): offtargets.append( seqModifier(match_chr[i][0:20]) ) - - lineNumber += 1 outFile.write(''.join(f'{offTarget}\n' for offTarget in offtargets)) + return len(offtargets) # Node function that sorts a file for multiprocessing pool def sortingNode(fileToSort, sortedTempDir): @@ -133,16 +118,16 @@ def sortingNode(fileToSort, sortedTempDir): ) # Sort input file and store in new output dir with open(fileToSort, 'r') as input: - # Read 'page' - page = input.readlines() - # Sort Page - page.sort() - # Write sorted page to file - sortedFile.writelines(page) - # Close sorted file - sortedFile.close() - -def paginatedSort(filesToSort, fpOutput, mpPool): + # Read 'page' + page = input.readlines() + # Sort Page + page.sort() + # Write sorted page to file + sortedFile.writelines(page) + # Close sorted file + sortedFile.close() + +def paginatedSort(filesToSort, fpOutput, mpPool, maxNumOpenFiles=400): # Create temp file directory sortedTempDir = tempfile.TemporaryDirectory() printer(f'Created temp directory {sortedTempDir.name} for sorting') @@ -161,6 +146,8 @@ def paginatedSort(filesToSort, fpOutput, mpPool): args ) + printer('Sorting of each file completed') + # Collect sorted files to merge sortedFiles = glob.glob( os.path.join( @@ -170,20 +157,43 @@ def paginatedSort(filesToSort, fpOutput, mpPool): ) # Open all the sorted files to merge - sortedFilesPointers = [open(file, 'r') for file in sortedFiles] - - # Merge all the sorted files - with open(fpOutput, 'w') as f: - f.writelines(heapq.merge(*sortedFilesPointers)) + printer(f'Beginning to merge sorted files, {maxNumOpenFiles:,} at a time') + while len(sortedFiles) > 1: + # A file to write the merged sequences to + mergedFile = tempfile.NamedTemporaryFile(delete = False) + + # Select the files to merge + while True: + try: + sortedFilesPointers = [open(file, 'r') for file in sortedFiles[:maxNumOpenFiles]] + break + except OSError as e: + if e.errno == 24: + printer(f'Attempted to open too many files at once (OSError errno 24)') + maxNumOpenFiles = max(1, int(maxNumOpenFiles / 2)) + printer(f'Reducing the number of files that can be opened by half to {maxNumOpenFiles}') + continue + raise e + + printer(f'Merging {len(sortedFilesPointers):,}') + + # Merge and write + with open(mergedFile.name, 'w') as f: + f.writelines(heapq.merge(*sortedFilesPointers)) + + # Close all of the open files + for file in sortedFilesPointers: + file.close() + + # prepare for the next set to be merged + sortedFiles = sortedFiles[maxNumOpenFiles:] + [mergedFile.name] - # Close all of the sorted files - for file in sortedFilesPointers: - file.close() + shutil.move(mergedFile.name, fpOutput) -def startMultiprocessing(fpInputs, fpOutput, mpPool): +def startMultiprocessing(fpInputs, fpOutput, mpPool, numThreads, maxOpenFiles): printer('Extracting off-targets using multiprocessing approach') - printer(f'Allowed processes: {PROCESSES_COUNT}') + printer(f'Allowed processes: {numThreads}') fpTempDir = tempfile.TemporaryDirectory() printer(f'Created a temporary directory for intermediate files: {fpTempDir.name}') @@ -218,17 +228,16 @@ def startMultiprocessing(fpInputs, fpOutput, mpPool): ) for fpInput in fpInputs ] - printer(f'Beginning to process {len(args)} files...') + printer(f'Beginning to process {len(args)} files') - mpPool.starmap( + numTargets = mpPool.starmap( processingNode, args ) - printer('Processing completed') + printer(f'Processing completed. Found {sum(numTargets):,} targets.') printer('Preparing for ISSL by sorting all intermediate files') - printer('Then, writing to user-specified output file') # sort in batches paginatedSort( @@ -239,25 +248,46 @@ def startMultiprocessing(fpInputs, fpOutput, mpPool): ) ), fpOutput, - mpPool + mpPool, + maxNumOpenFiles=maxOpenFiles ) def main(): - if (len(sys.argv) < 3): - print('Error!') - print('Expecting: ExtractOfftargets.py [ | ]') - print('\n') - exit() + parser = argparse.ArgumentParser(description='Extract CRISPR target sites for Crackling.') + + parser.add_argument('output', + help='A file to write the off-targets to.' + ) + parser.add_argument('inputs', + help='A space-separated list of paths or a path containing wildcards to FASTA files.', + nargs='+' + ) + parser.add_argument('--maxOpenFiles', + help='The number of files allowed to be opened by a single process. Use `ulimit -n` to find out your system setting.', + type=int, + default=1000, + required=False + ) + parser.add_argument('--threads', + help='The number of threads to use. Default is `os.cpu_count()`.', + type=int, + default=os.cpu_count(), + required=False + ) + + args = parser.parse_args() # Create multiprocessing pool # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap - mpPool = multiprocessing.Pool(PROCESSES_COUNT) - - fpOutput = sys.argv[1] - - fpInputs = sys.argv[2:] - - startMultiprocessing(fpInputs, fpOutput, mpPool) + mpPool = multiprocessing.Pool(args.threads) + + startMultiprocessing( + args.inputs, + args.output, + mpPool, + args.threads, + args.maxOpenFiles + ) # Clean up. Close multiprocessing pool mpPool.close() @@ -265,4 +295,4 @@ def main(): printer('Goodbye.') if __name__ == '__main__': - main() \ No newline at end of file + main()