From f6763bae82a56d21e765db8b579bcd0064d2e64a Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 5 Dec 2024 10:30:05 -0600 Subject: [PATCH 1/4] Fail the run if publish thread pool times out Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/Session.groovy | 14 +++++++++++--- .../nextflow/util/ThreadPoolHelper.groovy | 9 ++++----- .../nextflow/util/ThreadPoolManager.groovy | 15 +++++++++------ .../cloud/aws/batch/AwsBatchExecutor.groovy | 5 +++-- .../main/nextflow/cloud/aws/nio/S3Client.java | 14 -------------- .../nextflow/cloud/aws/nio/S3OutputStream.java | 18 ------------------ 6 files changed, 27 insertions(+), 48 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index f394245259..1432f53051 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -682,15 +682,23 @@ class Session implements ISession { if( !aborted ) { joinAllOperators() log.trace "Session > all operators finished" + final finalizerComplete = finalizePoolManager?.shutdown(false) + final publisherComplete = publishPoolManager?.shutdown(false) + if( !finalizerComplete || !publisherComplete ) { + final failOnIncomplete = config.navigate('workflow.output.ignoreErrors') + if( failOnIncomplete ) + throw new AbortOperationException("Timed out while waiting to publish outputs") + } + } + else { + finalizePoolManager?.shutdown(true) + publishPoolManager?.shutdown(true) } } void destroy() { try { log.trace "Session > destroying" - // shutdown thread pools - finalizePoolManager?.shutdown(aborted) - publishPoolManager?.shutdown(aborted) // invoke shutdown callbacks shutdown0() log.trace "Session > after cleanup" diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy index 12c736bd60..7e5ba62c35 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy @@ -34,7 +34,7 @@ import jdk.internal.vm.ThreadContainer @Slf4j class ThreadPoolHelper { - static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) { + static boolean await(ExecutorService pool, Duration maxAwait, String waitMessage) { final max = maxAwait.millis final t0 = System.currentTimeMillis() // wait for ongoing file transfer to complete @@ -45,10 +45,8 @@ class ThreadPoolHelper { break final delta = System.currentTimeMillis()-t0 - if( delta > max ) { - log.warn(exitMsg) - break - } + if( delta > max ) + return false // log to console every 10 minutes (120 * 5 sec) if( count % 120 == 0 ) { @@ -61,6 +59,7 @@ class ThreadPoolHelper { // increment the count count++ } + return true } static protected int pending(ExecutorService pool) { diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy index d1ea726f74..96af1ec1b6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy @@ -109,24 +109,27 @@ class ThreadPoolManager { return result } - void shutdown(ISession session) { + boolean shutdown(ISession session) { final sess = (Session) session - shutdown( sess != null && sess.aborted ) + return shutdown( sess != null && sess.aborted ) } - void shutdown(boolean hard) { + boolean shutdown(boolean hard) { if( !executorService ) - return + return true if( hard ) { executorService.shutdownNow() - return + return true } executorService.shutdown() // wait for remaining threads to complete - ThreadPoolHelper.await(executorService, maxAwait, waitMsg, exitMsg) + final complete = ThreadPoolHelper.await(executorService, maxAwait, waitMsg) + if( !complete ) + log.warn exitMsg log.debug "Thread pool '$name' shutdown completed (hard=$hard)" + return complete } static ExecutorService create(String name, int maxThreads=0) { diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy index 1ce8875521..f026c2730a 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy @@ -319,8 +319,9 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec // start shutdown process reaper.shutdown() final waitMsg = "[AWS BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)" - final exitMsg = "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" - ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg) + final complete = ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg) + if( !complete ) + log.warn "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" } @Override diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java index 9661afd657..25aadbb2c8 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java @@ -660,18 +660,4 @@ public void uploadDirectory(File source, S3Path target) { String getObjectKmsKeyId(String bucketName, String key) { return getObjectMetadata(bucketName,key).getSSEAwsKmsKeyId(); } - - protected void showdownTransferPool(boolean hard) { - log.debug("Initiating transfer manager shutdown (hard={})", hard); - if( hard ) { - transferPool.shutdownNow(); - } - else { - // await pool completion - transferPool.shutdown(); - final String waitMsg = "[AWS S3] Waiting files transfer to complete (%d files)"; - final String exitMsg = "[AWS S3] Exiting before FileTransfer thread pool complete -- Some files maybe lost"; - ThreadPoolHelper.await(transferPool, Duration.of("1h"), waitMsg, exitMsg); - } - } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java index 56454b186a..eed9e3cda7 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java @@ -665,22 +665,4 @@ static synchronized ExecutorService getOrCreateExecutor(int maxThreads) { return executorSingleton; } - /** - * Shutdown the executor and clear the singleton - */ - static void shutdownExecutor(boolean hard) { - if( hard ) { - executorSingleton.shutdownNow(); - } - else { - executorSingleton.shutdown(); - log.trace("Uploader await completion"); - final String waitMsg = "[AWS S3] Waiting stream uploader to complete (%d files)"; - final String exitMsg = "[AWS S3] Exiting before stream uploader thread pool complete -- Some files maybe lost"; - ThreadPoolHelper.await(executorSingleton, Duration.of("1h") ,waitMsg, exitMsg); - log.trace("Uploader shutdown completed"); - executorSingleton = null; - } - } - } From 19de4d3d86df3a70895de87fe34225a747964651 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 13 Dec 2024 09:19:48 -0600 Subject: [PATCH 2/4] Fix the use of `workflow.output.ignoreErrors` Signed-off-by: Ben Sherman --- modules/nextflow/src/main/groovy/nextflow/Session.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 1432f53051..ac7e7ec3a7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -685,8 +685,8 @@ class Session implements ISession { final finalizerComplete = finalizePoolManager?.shutdown(false) final publisherComplete = publishPoolManager?.shutdown(false) if( !finalizerComplete || !publisherComplete ) { - final failOnIncomplete = config.navigate('workflow.output.ignoreErrors') - if( failOnIncomplete ) + final ignoreErrors = config.navigate('workflow.output.ignoreErrors') + if( !ignoreErrors ) throw new AbortOperationException("Timed out while waiting to publish outputs") } } From c475e7a88eef9e52727f3d0c1fb5785c26eca790 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 13 Dec 2024 10:21:00 -0600 Subject: [PATCH 3/4] Apply suggestions from review Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/Session.groovy | 23 ++++++++++--------- .../nextflow/util/ThreadPoolHelper.groovy | 5 ++-- .../nextflow/util/ThreadPoolManager.groovy | 15 +++++------- .../cloud/aws/batch/AwsBatchExecutor.groovy | 10 +++++--- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index ac7e7ec3a7..41b9539afd 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -682,23 +682,24 @@ class Session implements ISession { if( !aborted ) { joinAllOperators() log.trace "Session > all operators finished" - final finalizerComplete = finalizePoolManager?.shutdown(false) - final publisherComplete = publishPoolManager?.shutdown(false) - if( !finalizerComplete || !publisherComplete ) { - final ignoreErrors = config.navigate('workflow.output.ignoreErrors') - if( !ignoreErrors ) - throw new AbortOperationException("Timed out while waiting to publish outputs") - } - } - else { - finalizePoolManager?.shutdown(true) - publishPoolManager?.shutdown(true) } } void destroy() { try { log.trace "Session > destroying" + // shutdown thread pools + try { + finalizePoolManager?.shutdown(aborted) + publishPoolManager?.shutdown(aborted) + } + catch( TimeoutException e ) { + final ignoreErrors = config.navigate('workflow.output.ignoreErrors', false) + if( !ignoreErrors ) + throw new AbortOperationException("Timed out while waiting to publish outputs") + else + log.warn e.message + } // invoke shutdown callbacks shutdown0() log.trace "Session > after cleanup" diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy index 7e5ba62c35..8f13ea8560 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy @@ -34,7 +34,7 @@ import jdk.internal.vm.ThreadContainer @Slf4j class ThreadPoolHelper { - static boolean await(ExecutorService pool, Duration maxAwait, String waitMessage) { + static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) { final max = maxAwait.millis final t0 = System.currentTimeMillis() // wait for ongoing file transfer to complete @@ -46,7 +46,7 @@ class ThreadPoolHelper { final delta = System.currentTimeMillis()-t0 if( delta > max ) - return false + throw new TimeoutException(exitMsg) // log to console every 10 minutes (120 * 5 sec) if( count % 120 == 0 ) { @@ -59,7 +59,6 @@ class ThreadPoolHelper { // increment the count count++ } - return true } static protected int pending(ExecutorService pool) { diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy index 96af1ec1b6..d1ea726f74 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy @@ -109,27 +109,24 @@ class ThreadPoolManager { return result } - boolean shutdown(ISession session) { + void shutdown(ISession session) { final sess = (Session) session - return shutdown( sess != null && sess.aborted ) + shutdown( sess != null && sess.aborted ) } - boolean shutdown(boolean hard) { + void shutdown(boolean hard) { if( !executorService ) - return true + return if( hard ) { executorService.shutdownNow() - return true + return } executorService.shutdown() // wait for remaining threads to complete - final complete = ThreadPoolHelper.await(executorService, maxAwait, waitMsg) - if( !complete ) - log.warn exitMsg + ThreadPoolHelper.await(executorService, maxAwait, waitMsg, exitMsg) log.debug "Thread pool '$name' shutdown completed (hard=$hard)" - return complete } static ExecutorService create(String name, int maxThreads=0) { diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy index f026c2730a..d079afb6a0 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy @@ -319,9 +319,13 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec // start shutdown process reaper.shutdown() final waitMsg = "[AWS BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)" - final complete = ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg) - if( !complete ) - log.warn "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" + final exitMsg = "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" + try { + ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg) + } + catch( TimeoutException e ) { + log.warn e.message + } } @Override From b4aa73d7b47ea6efa1153220748accd37b621546 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 13 Dec 2024 10:25:08 -0600 Subject: [PATCH 4/4] Fix build errors Signed-off-by: Ben Sherman --- modules/nextflow/src/main/groovy/nextflow/Session.groovy | 1 + .../src/main/groovy/nextflow/util/ThreadPoolHelper.groovy | 1 + .../src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy | 1 + 3 files changed, 3 insertions(+) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 41b9539afd..5fc6c436f4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -23,6 +23,7 @@ import java.nio.file.Paths import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.TimeoutException import com.google.common.hash.HashCode import groovy.transform.CompileDynamic diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy index 8f13ea8560..edb4b083cc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy @@ -20,6 +20,7 @@ package nextflow.util import java.util.concurrent.ExecutorService import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import groovy.transform.CompileStatic import groovy.util.logging.Slf4j diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy index d079afb6a0..23b9bf9c35 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy @@ -18,6 +18,7 @@ package nextflow.cloud.aws.batch import java.nio.file.Path import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import com.amazonaws.services.batch.AWSBatch import com.amazonaws.services.batch.model.AWSBatchException