diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c7ffd4e4..678c04e73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +### Release 2.4.6 (January 13, 2023) +* [#1022](https://github.com/awslabs/amazon-kinesis-client/pull/1022) Clean up in-memory state of deleted kinesis stream in MultiStreamMode + ### Release 2.4.5 (January 04, 2023) * [#1014](https://github.com/awslabs/amazon-kinesis-client/pull/1014) Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request diff --git a/README.md b/README.md index 160ab682f..e85aa4dc2 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,9 @@ The recommended way to use the KCL for Java is to consume it from Maven. ## Release Notes +### Release 2.4.6 (January 13, 2023) +* [#1022](https://github.com/awslabs/amazon-kinesis-client/pull/1022) Clean up in-memory state of deleted kinesis stream in MultiStreamMode + ### Release 2.4.5 (January 04, 2023) * [#1014](https://github.com/awslabs/amazon-kinesis-client/pull/1014) Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml index 41e5e0f90..887211c79 100644 --- a/amazon-kinesis-client-multilang/pom.xml +++ b/amazon-kinesis-client-multilang/pom.xml @@ -21,7 +21,7 @@ amazon-kinesis-client-pom software.amazon.kinesis - 2.4.5 + 2.4.6 4.0.0 diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 77250400b..8d776d0d3 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -22,7 +22,7 @@ software.amazon.kinesis amazon-kinesis-client-pom - 2.4.5 + 2.4.6 amazon-kinesis-client diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java new file mode 100644 index 000000000..d0d332d9c --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java @@ -0,0 +1,38 @@ +package software.amazon.kinesis.coordinator; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import software.amazon.kinesis.common.StreamIdentifier; + +/** + * This class is used for storing in-memory set of streams which are no longer existing (deleted) and needs to be + * cleaned up from KCL's in memory state. + */ +@Slf4j +public class DeletedStreamListProvider { + + private final Set deletedStreams; + + public DeletedStreamListProvider() { + deletedStreams = ConcurrentHashMap.newKeySet(); + } + + public void add(StreamIdentifier streamIdentifier) { + log.info("Added {}", streamIdentifier); + deletedStreams.add(streamIdentifier); + } + + /** + * Method returns and empties the current set of streams + * @return set of deleted Streams + */ + public Set purgeAllDeletedStream() { + final Set response = new HashSet<>(deletedStreams); + deletedStreams.removeAll(response); + return response; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 5d9f73e9d..5b9510298 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -41,7 +41,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -54,7 +53,6 @@ import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; -import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; @@ -75,7 +73,6 @@ import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; -import software.amazon.kinesis.leases.exceptions.LeasingException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.LifecycleConfig; import software.amazon.kinesis.lifecycle.ShardConsumer; @@ -121,6 +118,7 @@ public class Scheduler implements Runnable { private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count"; private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count"; private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count"; + private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count"; private SchedulerLog slog = new SchedulerLog(); @@ -173,6 +171,8 @@ public class Scheduler implements Runnable { private final LeaseCleanupManager leaseCleanupManager; private final SchemaRegistryDecoder schemaRegistryDecoder; + private final DeletedStreamListProvider deletedStreamListProvider; + // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap<>(); @@ -263,9 +263,10 @@ protected Scheduler(@NonNull final CheckpointConfig checkpointConfig, this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); this.diagnosticEventFactory = diagnosticEventFactory; this.diagnosticEventHandler = new DiagnosticEventLogger(); + this.deletedStreamListProvider = new DeletedStreamListProvider(); this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig .leaseManagementFactory(leaseSerializer, isMultiStreamMode) - .createShardSyncTaskManager(this.metricsFactory, streamConfig); + .createShardSyncTaskManager(this.metricsFactory, streamConfig, this.deletedStreamListProvider); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = @@ -558,6 +559,19 @@ Set checkAndSyncStreamShardsAndLeases() .partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet())); final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); + // These are the streams which are deleted in Kinesis and we encounter resource not found during + // shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will + // not have any data. + // Filter streams based on newStreamConfigMap so that we don't override input to KCL in any case. + final Set deletedStreamSet = this.deletedStreamListProvider + .purgeAllDeletedStream() + .stream() + .filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier)) + .collect(Collectors.toSet()); + if (deletedStreamSet.size() > 0) { + log.info("Stale streams to delete: {}", deletedStreamSet); + staleStreamIdsToBeDeleted.addAll(deletedStreamSet); + } final Set deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted); streamsSynced.addAll(deletedStreamsLeases); @@ -577,6 +591,8 @@ Set checkAndSyncStreamShardsAndLeases() MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY); MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(), MetricsLevel.SUMMARY); + MetricsUtil.addCount(metricsScope, NON_EXISTING_STREAM_DELETE_COUNT, deletedStreamSet.size(), + MetricsLevel.SUMMARY); MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY); } finally { MetricsUtil.endScope(metricsScope); @@ -614,6 +630,7 @@ private void removeStreamsFromStaleStreamsList(Set streamIdent private Set deleteMultiStreamLeases(Set streamIdentifiers) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + log.info("Deleting streams: {}", streamIdentifiers); final Set streamsSynced = new HashSet<>(); List leases = null; Map> streamIdToShardsMap = null; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 4f677524e..0166fc5e0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -17,6 +17,7 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +40,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; @@ -47,6 +49,7 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -56,6 +59,7 @@ import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static java.util.Objects.nonNull; import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; /** @@ -72,6 +76,8 @@ public class HierarchicalShardSyncer { private final String streamIdentifier; + private final DeletedStreamListProvider deletedStreamListProvider; + private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); private static final int retriesForCompleteHashRange = 3; @@ -79,13 +85,17 @@ public class HierarchicalShardSyncer { private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000; public HierarchicalShardSyncer() { - isMultiStreamMode = false; - streamIdentifier = "SingleStreamMode"; + this(false, "SingleStreamMode"); } public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) { + this(isMultiStreamMode, streamIdentifier, null); + } + + public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) { this.isMultiStreamMode = isMultiStreamMode; this.streamIdentifier = streamIdentifier; + this.deletedStreamListProvider = deletedStreamListProvider; } private static final BiFunction shardIdFromLeaseDeducer = @@ -306,8 +316,17 @@ private static List getShardListAtInitialPosition(@NonNull final ShardDet + retriesForCompleteHashRange + " retries."); } - private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { - final Optional> shards = Optional.of(shardDetector.listShards()); + private List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { + // Fallback to existing behavior for backward compatibility + List shardList = Collections.emptyList(); + try { + shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException(); + } catch (ResourceNotFoundException e) { + if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) { + deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier)); + } + } + final Optional> shards = Optional.of(shardList); return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index 189ba18b0..2d2ffefc0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -80,6 +80,8 @@ public class KinesisShardDetector implements ShardDetector { @Getter(AccessLevel.PACKAGE) private AtomicInteger cacheMisses = new AtomicInteger(0); + private static final Boolean THROW_RESOURCE_NOT_FOUND_EXCEPTION = true; + @Deprecated public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, @@ -159,15 +161,26 @@ public List listShards() { return listShardsWithFilter(null); } + @Override + @Synchronized + public List listShardsWithoutConsumingResourceNotFoundException() { + return listShardsWithFilterInternal(null, THROW_RESOURCE_NOT_FOUND_EXCEPTION); + } + @Override @Synchronized public List listShardsWithFilter(ShardFilter shardFilter) { + return listShardsWithFilterInternal(shardFilter, !THROW_RESOURCE_NOT_FOUND_EXCEPTION); + } + + private List listShardsWithFilterInternal(ShardFilter shardFilter, + boolean shouldPropagateResourceNotFoundException) { final List shards = new ArrayList<>(); ListShardsResponse result; String nextToken = null; do { - result = listShards(shardFilter, nextToken); + result = listShards(shardFilter, nextToken, shouldPropagateResourceNotFoundException); if (result == null) { /* @@ -185,7 +198,12 @@ public List listShardsWithFilter(ShardFilter shardFilter) { return shards; } - private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) { + /** + * @param shouldPropagateResourceNotFoundException : used to determine if ResourceNotFoundException should be + * handled by method and return Empty list or propagate the exception. + */ + private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken, + final boolean shouldPropagateResourceNotFoundException) { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); exceptionManager.add(ResourceNotFoundException.class, t -> t); exceptionManager.add(LimitExceededException.class, t -> t); @@ -233,9 +251,14 @@ private ListShardsResponse listShards(ShardFilter shardFilter, final String next } catch (ResourceNotFoundException e) { log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.", streamIdentifier.streamName()); - return ListShardsResponse.builder().shards(Collections.emptyList()) - .nextToken(null) - .build(); + if (shouldPropagateResourceNotFoundException) { + throw e; + } + return ListShardsResponse.builder() + .shards(Collections.emptyList()) + .nextToken(null) + .build(); + } catch (TimeoutException te) { throw new RuntimeException(te); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java index ecf9b3900..9f2e5f949 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.leases; import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.metrics.MetricsFactory; @@ -31,6 +32,11 @@ default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFa throw new UnsupportedOperationException(); } + default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig, + DeletedStreamListProvider deletedStreamListProvider) { + throw new UnsupportedOperationException("createShardSyncTaskManager method not implemented"); + } + DynamoDBLeaseRefresher createLeaseRefresher(); ShardDetector createShardDetector(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 62b938551..32514eb57 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -46,6 +46,16 @@ public interface ShardDetector { */ List listShards(); + /** + * This method behaves exactly similar to listShards except the fact that this does not consume and throw + * ResourceNotFoundException instead of returning empty list. + * + * @return Shards + */ + default List listShardsWithoutConsumingResourceNotFoundException() { + throw new UnsupportedOperationException("listShardsWithoutConsumingResourceNotFoundException not implemented"); + } + /** * List shards with shard filter. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index ad1a23005..6bf2ff39a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -29,6 +29,7 @@ import software.amazon.kinesis.common.LeaseCleanupConfig; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.LeaseCleanupManager; @@ -504,6 +505,20 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac */ @Override public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { + return createShardSyncTaskManager(metricsFactory, streamConfig, null); + } + + /** + * Create ShardSyncTaskManager from the streamConfig passed + * + * @param metricsFactory - factory to get metrics object + * @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created + * @param deletedStreamListProvider - store for capturing the streams which are deleted in kinesis + * @return ShardSyncTaskManager + */ + @Override + public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig, + DeletedStreamListProvider deletedStreamListProvider) { return new ShardSyncTaskManager(this.createShardDetector(streamConfig), this.createLeaseRefresher(), streamConfig.initialPositionInStreamExtended(), @@ -511,10 +526,12 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac ignoreUnexpectedChildShards, shardSyncIntervalMillis, executorService, - new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()), + new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString(), + deletedStreamListProvider), metricsFactory); } + @Override public DynamoDBLeaseRefresher createLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index 53739e407..85ed94c69 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -46,7 +46,7 @@ public class RetrievalConfig { */ public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java"; - public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.5"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.6"; /** * Client used to make calls to Kinesis for records retrieval diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index aa9f8412b..e0ad84261 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -39,6 +39,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -728,16 +729,8 @@ public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeleted private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion, boolean onlyStreamsNoLeasesDeletion) throws DependencyException, ProvisionedThroughputException, InvalidStateException { - List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( - StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) - .collect(Collectors.toCollection(LinkedList::new)); - List streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( - StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) - .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList1 = createDummyStreamConfigList(1,5); + List streamConfigList2 = createDummyStreamConfigList(3,7); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); @@ -784,6 +777,91 @@ private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDelete scheduler.staleStreamDeletionMap().keySet()); } + + @Test + public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException { + List streamConfigList1 = createDummyStreamConfigList(1,6); + List streamConfigList2 = createDummyStreamConfigList(1,4); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + + prepareForStaleDeletedStreamCleanupTests(); + + // when KCL starts it starts with tracking 5 stream + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(0, scheduler.staleStreamDeletionMap().size()); + + // 2 Streams are no longer needed to be consumed + Set syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases(); + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(createDummyStreamConfigList(4, 6).stream() + .map(StreamConfig::streamIdentifier) + .collect(Collectors.toSet()), scheduler.staleStreamDeletionMap() + .keySet()); + assertEquals(0, syncedStreams1.size()); + + StreamConfig deletedStreamConfig = createDummyStreamConfig(5); + // One stream is deleted from Kinesis side + scheduler.deletedStreamListProvider().add(deletedStreamConfig.streamIdentifier()); + + Set syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases(); + + Set expectedCurrentStreamConfigs = Sets.newHashSet(streamConfigList1); + expectedCurrentStreamConfigs.remove(deletedStreamConfig); + + //assert kinesis deleted stream is cleaned up from KCL in memory state. + assertEquals(expectedCurrentStreamConfigs, Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(Sets.newHashSet(createDummyStreamConfig(4).streamIdentifier()), + Sets.newHashSet(scheduler.staleStreamDeletionMap().keySet())); + assertEquals(1, syncedStreams2.size()); + assertEquals(0, scheduler.deletedStreamListProvider().purgeAllDeletedStream().size()); + + verify(multiStreamTracker, times(3)).streamConfigList(); + + } + + private void prepareForStaleDeletedStreamCleanupTests() { + + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofDays(1); + } + }); + + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + } + // Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker + @Test + public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() + throws ProvisionedThroughputException, InvalidStateException, DependencyException { + List streamConfigList1 = createDummyStreamConfigList(1,6); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1); + prepareForStaleDeletedStreamCleanupTests(); + + scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); + + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + + assertEquals(0, syncedStreams.size()); + assertEquals(0, scheduler.staleStreamDeletionMap().size()); + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + + //Creates list of upperBound-lowerBound no of dummy StreamConfig + private List createDummyStreamConfigList(int lowerBound, int upperBound) { + return IntStream.range(lowerBound, upperBound).mapToObj(this::createDummyStreamConfig) + .collect(Collectors.toCollection(LinkedList::new)); + } + private StreamConfig createDummyStreamConfig(int streamId){ + return new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + } + @Test public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAfterDefermentPeriod() throws DependencyException, ProvisionedThroughputException, InvalidStateException { @@ -1116,7 +1194,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac @Override public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, - StreamConfig streamConfig) { + StreamConfig streamConfig, DeletedStreamListProvider deletedStreamListProvider) { if(shouldReturnDefaultShardSyncTaskmanager) { return shardSyncTaskManager; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index c390987c7..02fccaad7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -19,6 +19,7 @@ // import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; @@ -55,6 +56,7 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; @@ -63,9 +65,12 @@ import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.NullMetricsScope; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -294,7 +299,7 @@ public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exceptio final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); @@ -317,7 +322,8 @@ public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exceptio extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - verify(shardDetector).listShards(); + verify(shardDetector, never()).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -329,7 +335,7 @@ public void testCheckAndCreateLeasesForShardsIfMissingAtLatestMultiStream() thro final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); setupMultiStream(); @@ -352,7 +358,8 @@ public void testCheckAndCreateLeasesForShardsIfMissingAtLatestMultiStream() thro extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - verify(shardDetector).listShards(); + verify(shardDetector, never()).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -365,7 +372,7 @@ private List toMultiStreamLeaseList(List shardIdBasedLeases) { /** * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() - * should never be called. + * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called. */ @Test public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { @@ -398,13 +405,14 @@ public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); + verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } /** * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() - * should never be called. + * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called. */ @Test public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception { @@ -435,13 +443,14 @@ public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws E extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); + verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } /** * Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards() - * should never be called. + * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called. */ @Test public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception { @@ -468,6 +477,7 @@ public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Excepti assertThat(extendedSequenceNumbers.size(), equalTo(0)); verify(shardDetector, never()).listShards(); + verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -657,13 +667,13 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpen() throws Except shards.remove(3); shards.add(3, shard); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { - verify(shardDetector).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, never()).listLeases(); } } @@ -677,14 +687,14 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenForMultiStream() shards.remove(3); shards.add(3, shard); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); setupMultiStream(); try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { - verify(shardDetector).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, never()).listLeases(); } } @@ -711,7 +721,7 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); @@ -732,7 +742,8 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - verify(shardDetector).listShards(); + verify(shardDetector, never()).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -756,7 +767,7 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); setupMultiStream(); @@ -777,7 +788,7 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - verify(shardDetector).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -811,7 +822,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); final ArgumentCaptor leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()).thenReturn(leases); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); @@ -826,7 +837,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe assertThat(createLeases, equalTo(expectedCreateLeases)); - verify(shardDetector, times(1)).listShards(); + verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -841,7 +852,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe assertThat(deleteLeases.size(), equalTo(0)); - verify(shardDetector, times(2)).listShards(); + verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(2)).listLeases(); } @@ -877,7 +888,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()) .thenThrow(new DependencyException(new Throwable("Throw for ListLeases"))) .thenReturn(Collections.emptyList()).thenReturn(leases); @@ -889,7 +900,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { - verify(shardDetector, times(1)).listShards(); + verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -904,7 +915,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc assertThat(createLeases, equalTo(expectedCreateLeases)); - verify(shardDetector, times(2)).listShards(); + verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(2)).listLeases(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -919,13 +930,36 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc final Set expectedSequenceNumbers = new HashSet<>( Collections.singletonList(ExtendedSequenceNumber.SHARD_END)); - verify(shardDetector, times(3)).listShards(); + verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(3)).listLeases(); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } } + @Test + public void testDeletedStreamListProviderUpdateOnResourceNotFound() + throws ProvisionedThroughputException, InvalidStateException, DependencyException, InterruptedException { + DeletedStreamListProvider dummyDeletedStreamListProvider = new DeletedStreamListProvider(); + hierarchicalShardSyncer = new HierarchicalShardSyncer(MULTISTREAM_MODE_ON, STREAM_IDENTIFIER, + dummyDeletedStreamListProvider); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenThrow( + ResourceNotFoundException.builder() + .build()); + boolean response = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, + INITIAL_POSITION_TRIM_HORIZON, SCOPE, ignoreUnexpectedChildShards, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + Set deletedStreamSet = dummyDeletedStreamListProvider.purgeAllDeletedStream(); + + assertFalse(response); + assertThat(deletedStreamSet.size(), equalTo(1)); + assertThat(deletedStreamSet.iterator().next().toString(), equalTo(STREAM_IDENTIFIER)); + + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); + verify(shardDetector, never()).listShards(); + } + @Test(expected = DependencyException.class) public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithCreateLeaseExceptions() throws Exception { @@ -957,7 +991,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()) .thenReturn(Collections.emptyList()).thenReturn(leases); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())) @@ -969,7 +1003,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { - verify(shardDetector, times(1)).listShards(); + verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -983,7 +1017,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); assertThat(createLeases, equalTo(expectedCreateLeases)); - verify(shardDetector, times(2)).listShards(); + verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(2)).listLeases(); verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size())) .createLeaseIfNotExists(any(Lease.class)); @@ -994,7 +1028,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - verify(shardDetector, times(3)).listShards(); + verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size())) .createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(3)).listLeases(); @@ -1077,7 +1111,7 @@ private void testCheckAndCreateLeaseForShardsIfMissing(final List shards, final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(shardDetector.listShardsWithFilter(any())).thenReturn(getFilteredShards(shards, initialPosition)); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(existingLeases); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(existingLeases.isEmpty()); @@ -2296,14 +2330,14 @@ public void testNonEmptyLeaseTableUsesListShards() throws Exception { final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); - when(shardDetector.listShards()).thenReturn(shardsWithoutLeases); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shardsWithoutLeases); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - verify(shardDetector, atLeast(1)).listShards(); + verify(shardDetector, atLeast(1)).listShardsWithoutConsumingResourceNotFoundException(); } /** diff --git a/pom.xml b/pom.xml index 59feefef2..4bb86c1b8 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ amazon-kinesis-client-pom pom Amazon Kinesis Client Library - 2.4.5 + 2.4.6 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.