Skip to content

Commit

Permalink
move shutdownComplete call to ShardConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentvilo-aws committed Apr 10, 2024
1 parent b914eef commit 29d5042
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@ public ConsumerTask createTask(ShardConsumerArgument argument, ShardConsumer con
argument.shardRecordProcessor(),
argument.recordProcessorCheckpointer(),
consumer.shutdownReason(),
consumer.shutdownNotification(),
argument.initialPositionInStream(),
argument.cleanupLeasesOfCompletedShards(),
argument.ignoreUnexpectedChildShards(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ CompletableFuture<Boolean> shutdownComplete() {
}

executeTask(shardEndProcessRecordsInput);

// call shutdownNotification.shutdownComplete() if shutting down as part of gracefulShutdown
if (currentState.state() == ConsumerStates.ShardConsumerState.SHUTTING_DOWN &&
taskOutcome == TaskOutcome.SUCCESSFUL && shutdownNotification != null) {
shutdownNotification.shutdownComplete();
}
return false;
}
}, executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public class ShutdownTask implements ConsumerTask {
private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
@NonNull
private final ShutdownReason reason;
private final ShutdownNotification shutdownNotification;
@NonNull
private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards;
Expand Down Expand Up @@ -151,11 +150,6 @@ public TaskResult call() {
log.debug("Shutting down retrieval strategy for shard {}.", leaseKey);
recordsPublisher.shutdown();

// shutdownNotification is only set and used when gracefulShutdown starts
if (shutdownNotification != null) {
shutdownNotification.shutdownComplete();
}

log.debug("Record processor completed shutdown() for shard {}", leaseKey);

return new TaskResult(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,9 @@ public void testRequestedShutdownWhileQuiet() throws Exception {
shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
// No task is created/run for this shutdownRequestedAwaitState, so there's no task outcome.

// shutdownNotification.shutdownComplete() should only be called for gracefulShutdown
verify(shutdownNotification, times(1)).shutdownComplete();

verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(2)).afterTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedTaskInput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,18 +310,6 @@ public void testNullChildShards() throws Exception {
verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}

/**
* shutdownNotification is only set when ShardConsumer.gracefulShutdown() is called and should be null otherwise.
* The task should still call recordsPublisher.shutdown() regardless of the notification
*/
@Test
public void testCallWhenShutdownNotificationIsSet() {
final TaskResult result = createShutdownTaskWithNotification(LEASE_LOST, Collections.emptyList()).call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shutdownNotification).shutdownComplete();
}

@Test
public void testCallWhenShutdownNotificationIsNull() {
final TaskResult result = createShutdownTask(LEASE_LOST, Collections.emptyList()).call();
Expand Down Expand Up @@ -394,15 +382,7 @@ private ShutdownTask createShutdownTask(final ShutdownReason reason, final List<
private ShutdownTask createShutdownTask(final ShutdownReason reason, final List<ChildShard> childShards,
final ShardInfo shardInfo) {
return new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
reason, null, INITIAL_POSITION_TRIM_HORIZON, false, false,
leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer,
NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager);
}

private ShutdownTask createShutdownTaskWithNotification(final ShutdownReason reason,
final List<ChildShard> childShards) {
return new ShutdownTask(SHARD_INFO, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
reason, shutdownNotification, INITIAL_POSITION_TRIM_HORIZON, false, false,
reason, INITIAL_POSITION_TRIM_HORIZON, false, false,
leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer,
NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager);
}
Expand Down

0 comments on commit 29d5042

Please sign in to comment.