diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3c7ffd4e4..678c04e73 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,8 @@
# Changelog
+### Release 2.4.6 (January 13, 2023)
+* [#1022](https://github.com/awslabs/amazon-kinesis-client/pull/1022) Clean up in-memory state of deleted kinesis stream in MultiStreamMode
+
### Release 2.4.5 (January 04, 2023)
* [#1014](https://github.com/awslabs/amazon-kinesis-client/pull/1014) Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request
diff --git a/README.md b/README.md
index 160ab682f..e85aa4dc2 100644
--- a/README.md
+++ b/README.md
@@ -66,6 +66,9 @@ The recommended way to use the KCL for Java is to consume it from Maven.
## Release Notes
+### Release 2.4.6 (January 13, 2023)
+* [#1022](https://github.com/awslabs/amazon-kinesis-client/pull/1022) Clean up in-memory state of deleted kinesis stream in MultiStreamMode
+
### Release 2.4.5 (January 04, 2023)
* [#1014](https://github.com/awslabs/amazon-kinesis-client/pull/1014) Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request
diff --git a/amazon-kinesis-client-multilang/pom.xml b/amazon-kinesis-client-multilang/pom.xml
index 41e5e0f90..887211c79 100644
--- a/amazon-kinesis-client-multilang/pom.xml
+++ b/amazon-kinesis-client-multilang/pom.xml
@@ -21,7 +21,7 @@
amazon-kinesis-client-pom
software.amazon.kinesis
- 2.4.5
+ 2.4.6
4.0.0
diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index 77250400b..8d776d0d3 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -22,7 +22,7 @@
software.amazon.kinesis
amazon-kinesis-client-pom
- 2.4.5
+ 2.4.6
amazon-kinesis-client
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java
new file mode 100644
index 000000000..d0d332d9c
--- /dev/null
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java
@@ -0,0 +1,38 @@
+package software.amazon.kinesis.coordinator;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import lombok.extern.slf4j.Slf4j;
+
+import software.amazon.kinesis.common.StreamIdentifier;
+
+/**
+ * This class is used for storing in-memory set of streams which are no longer existing (deleted) and needs to be
+ * cleaned up from KCL's in memory state.
+ */
+@Slf4j
+public class DeletedStreamListProvider {
+
+ private final Set deletedStreams;
+
+ public DeletedStreamListProvider() {
+ deletedStreams = ConcurrentHashMap.newKeySet();
+ }
+
+ public void add(StreamIdentifier streamIdentifier) {
+ log.info("Added {}", streamIdentifier);
+ deletedStreams.add(streamIdentifier);
+ }
+
+ /**
+ * Method returns and empties the current set of streams
+ * @return set of deleted Streams
+ */
+ public Set purgeAllDeletedStream() {
+ final Set response = new HashSet<>(deletedStreams);
+ deletedStreams.removeAll(response);
+ return response;
+ }
+}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
index 5d9f73e9d..5b9510298 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java
@@ -41,7 +41,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -54,7 +53,6 @@
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
-import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
@@ -75,7 +73,6 @@
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
-import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
@@ -121,6 +118,7 @@ public class Scheduler implements Runnable {
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
+ private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";
private SchedulerLog slog = new SchedulerLog();
@@ -173,6 +171,8 @@ public class Scheduler implements Runnable {
private final LeaseCleanupManager leaseCleanupManager;
private final SchemaRegistryDecoder schemaRegistryDecoder;
+ private final DeletedStreamListProvider deletedStreamListProvider;
+
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap<>();
@@ -263,9 +263,10 @@ protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger();
+ this.deletedStreamListProvider = new DeletedStreamListProvider();
this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
- .createShardSyncTaskManager(this.metricsFactory, streamConfig);
+ .createShardSyncTaskManager(this.metricsFactory, streamConfig, this.deletedStreamListProvider);
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@@ -558,6 +559,19 @@ Set checkAndSyncStreamShardsAndLeases()
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
+ // These are the streams which are deleted in Kinesis and we encounter resource not found during
+ // shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
+ // not have any data.
+ // Filter streams based on newStreamConfigMap so that we don't override input to KCL in any case.
+ final Set deletedStreamSet = this.deletedStreamListProvider
+ .purgeAllDeletedStream()
+ .stream()
+ .filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier))
+ .collect(Collectors.toSet());
+ if (deletedStreamSet.size() > 0) {
+ log.info("Stale streams to delete: {}", deletedStreamSet);
+ staleStreamIdsToBeDeleted.addAll(deletedStreamSet);
+ }
final Set deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
streamsSynced.addAll(deletedStreamsLeases);
@@ -577,6 +591,8 @@ Set checkAndSyncStreamShardsAndLeases()
MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(),
MetricsLevel.SUMMARY);
+ MetricsUtil.addCount(metricsScope, NON_EXISTING_STREAM_DELETE_COUNT, deletedStreamSet.size(),
+ MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY);
} finally {
MetricsUtil.endScope(metricsScope);
@@ -614,6 +630,7 @@ private void removeStreamsFromStaleStreamsList(Set streamIdent
private Set deleteMultiStreamLeases(Set streamIdentifiers)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
+ log.info("Deleting streams: {}", streamIdentifiers);
final Set streamsSynced = new HashSet<>();
List leases = null;
Map> streamIdToShardsMap = null;
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
index 4f677524e..0166fc5e0 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java
@@ -17,6 +17,7 @@
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,6 +40,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
@@ -47,6 +49,7 @@
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
+import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@@ -56,6 +59,7 @@
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
+import static java.util.Objects.nonNull;
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
/**
@@ -72,6 +76,8 @@ public class HierarchicalShardSyncer {
private final String streamIdentifier;
+ private final DeletedStreamListProvider deletedStreamListProvider;
+
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
private static final int retriesForCompleteHashRange = 3;
@@ -79,13 +85,17 @@ public class HierarchicalShardSyncer {
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
public HierarchicalShardSyncer() {
- isMultiStreamMode = false;
- streamIdentifier = "SingleStreamMode";
+ this(false, "SingleStreamMode");
}
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
+ this(isMultiStreamMode, streamIdentifier, null);
+ }
+
+ public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) {
this.isMultiStreamMode = isMultiStreamMode;
this.streamIdentifier = streamIdentifier;
+ this.deletedStreamListProvider = deletedStreamListProvider;
}
private static final BiFunction shardIdFromLeaseDeducer =
@@ -306,8 +316,17 @@ private static List getShardListAtInitialPosition(@NonNull final ShardDet
+ retriesForCompleteHashRange + " retries.");
}
- private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
- final Optional> shards = Optional.of(shardDetector.listShards());
+ private List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
+ // Fallback to existing behavior for backward compatibility
+ List shardList = Collections.emptyList();
+ try {
+ shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException();
+ } catch (ResourceNotFoundException e) {
+ if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) {
+ deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier));
+ }
+ }
+ final Optional> shards = Optional.of(shardList);
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
index 189ba18b0..2d2ffefc0 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java
@@ -80,6 +80,8 @@ public class KinesisShardDetector implements ShardDetector {
@Getter(AccessLevel.PACKAGE)
private AtomicInteger cacheMisses = new AtomicInteger(0);
+ private static final Boolean THROW_RESOURCE_NOT_FOUND_EXCEPTION = true;
+
@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
@@ -159,15 +161,26 @@ public List listShards() {
return listShardsWithFilter(null);
}
+ @Override
+ @Synchronized
+ public List listShardsWithoutConsumingResourceNotFoundException() {
+ return listShardsWithFilterInternal(null, THROW_RESOURCE_NOT_FOUND_EXCEPTION);
+ }
+
@Override
@Synchronized
public List listShardsWithFilter(ShardFilter shardFilter) {
+ return listShardsWithFilterInternal(shardFilter, !THROW_RESOURCE_NOT_FOUND_EXCEPTION);
+ }
+
+ private List listShardsWithFilterInternal(ShardFilter shardFilter,
+ boolean shouldPropagateResourceNotFoundException) {
final List shards = new ArrayList<>();
ListShardsResponse result;
String nextToken = null;
do {
- result = listShards(shardFilter, nextToken);
+ result = listShards(shardFilter, nextToken, shouldPropagateResourceNotFoundException);
if (result == null) {
/*
@@ -185,7 +198,12 @@ public List listShardsWithFilter(ShardFilter shardFilter) {
return shards;
}
- private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
+ /**
+ * @param shouldPropagateResourceNotFoundException : used to determine if ResourceNotFoundException should be
+ * handled by method and return Empty list or propagate the exception.
+ */
+ private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken,
+ final boolean shouldPropagateResourceNotFoundException) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(LimitExceededException.class, t -> t);
@@ -233,9 +251,14 @@ private ListShardsResponse listShards(ShardFilter shardFilter, final String next
} catch (ResourceNotFoundException e) {
log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.",
streamIdentifier.streamName());
- return ListShardsResponse.builder().shards(Collections.emptyList())
- .nextToken(null)
- .build();
+ if (shouldPropagateResourceNotFoundException) {
+ throw e;
+ }
+ return ListShardsResponse.builder()
+ .shards(Collections.emptyList())
+ .nextToken(null)
+ .build();
+
} catch (TimeoutException te) {
throw new RuntimeException(te);
}
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
index ecf9b3900..9f2e5f949 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java
@@ -16,6 +16,7 @@
package software.amazon.kinesis.leases;
import software.amazon.kinesis.common.StreamConfig;
+import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.metrics.MetricsFactory;
@@ -31,6 +32,11 @@ default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFa
throw new UnsupportedOperationException();
}
+ default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig,
+ DeletedStreamListProvider deletedStreamListProvider) {
+ throw new UnsupportedOperationException("createShardSyncTaskManager method not implemented");
+ }
+
DynamoDBLeaseRefresher createLeaseRefresher();
ShardDetector createShardDetector();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java
index 62b938551..32514eb57 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java
@@ -46,6 +46,16 @@ public interface ShardDetector {
*/
List listShards();
+ /**
+ * This method behaves exactly similar to listShards except the fact that this does not consume and throw
+ * ResourceNotFoundException instead of returning empty list.
+ *
+ * @return Shards
+ */
+ default List listShardsWithoutConsumingResourceNotFoundException() {
+ throw new UnsupportedOperationException("listShardsWithoutConsumingResourceNotFoundException not implemented");
+ }
+
/**
* List shards with shard filter.
*
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
index ad1a23005..6bf2ff39a 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java
@@ -29,6 +29,7 @@
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
+import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCleanupManager;
@@ -504,6 +505,20 @@ public ShardSyncTaskManager createShardSyncTaskManager(@NonNull final MetricsFac
*/
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
+ return createShardSyncTaskManager(metricsFactory, streamConfig, null);
+ }
+
+ /**
+ * Create ShardSyncTaskManager from the streamConfig passed
+ *
+ * @param metricsFactory - factory to get metrics object
+ * @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created
+ * @param deletedStreamListProvider - store for capturing the streams which are deleted in kinesis
+ * @return ShardSyncTaskManager
+ */
+ @Override
+ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig,
+ DeletedStreamListProvider deletedStreamListProvider) {
return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
this.createLeaseRefresher(),
streamConfig.initialPositionInStreamExtended(),
@@ -511,10 +526,12 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
- new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()),
+ new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString(),
+ deletedStreamListProvider),
metricsFactory);
}
+
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
index 53739e407..85ed94c69 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java
@@ -46,7 +46,7 @@ public class RetrievalConfig {
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java";
- public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.5";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.6";
/**
* Client used to make calls to Kinesis for records retrieval
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
index aa9f8412b..e0ad84261 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java
@@ -39,6 +39,7 @@
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -728,16 +729,8 @@ public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeleted
private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion,
boolean onlyStreamsNoLeasesDeletion)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
- List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
- StreamIdentifier.multiStreamInstance(
- Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
- InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
- .collect(Collectors.toCollection(LinkedList::new));
- List streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
- StreamIdentifier.multiStreamInstance(
- Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
- InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
- .collect(Collectors.toCollection(LinkedList::new));
+ List streamConfigList1 = createDummyStreamConfigList(1,5);
+ List streamConfigList2 = createDummyStreamConfigList(3,7);
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
@@ -784,6 +777,91 @@ private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDelete
scheduler.staleStreamDeletionMap().keySet());
}
+
+ @Test
+ public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException {
+ List streamConfigList1 = createDummyStreamConfigList(1,6);
+ List streamConfigList2 = createDummyStreamConfigList(1,4);
+ when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
+
+ prepareForStaleDeletedStreamCleanupTests();
+
+ // when KCL starts it starts with tracking 5 stream
+ assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
+ assertEquals(0, scheduler.staleStreamDeletionMap().size());
+
+ // 2 Streams are no longer needed to be consumed
+ Set syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases();
+ assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
+ assertEquals(createDummyStreamConfigList(4, 6).stream()
+ .map(StreamConfig::streamIdentifier)
+ .collect(Collectors.toSet()), scheduler.staleStreamDeletionMap()
+ .keySet());
+ assertEquals(0, syncedStreams1.size());
+
+ StreamConfig deletedStreamConfig = createDummyStreamConfig(5);
+ // One stream is deleted from Kinesis side
+ scheduler.deletedStreamListProvider().add(deletedStreamConfig.streamIdentifier());
+
+ Set syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases();
+
+ Set expectedCurrentStreamConfigs = Sets.newHashSet(streamConfigList1);
+ expectedCurrentStreamConfigs.remove(deletedStreamConfig);
+
+ //assert kinesis deleted stream is cleaned up from KCL in memory state.
+ assertEquals(expectedCurrentStreamConfigs, Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
+ assertEquals(Sets.newHashSet(createDummyStreamConfig(4).streamIdentifier()),
+ Sets.newHashSet(scheduler.staleStreamDeletionMap().keySet()));
+ assertEquals(1, syncedStreams2.size());
+ assertEquals(0, scheduler.deletedStreamListProvider().purgeAllDeletedStream().size());
+
+ verify(multiStreamTracker, times(3)).streamConfigList();
+
+ }
+
+ private void prepareForStaleDeletedStreamCleanupTests() {
+
+ when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
+ @Override public Duration waitPeriodToDeleteFormerStreams() {
+ return Duration.ofDays(1);
+ }
+ });
+
+ retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
+ .retrievalFactory(retrievalFactory);
+ scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
+ metricsConfig, processorConfig, retrievalConfig));
+ when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
+ }
+ // Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker
+ @Test
+ public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream()
+ throws ProvisionedThroughputException, InvalidStateException, DependencyException {
+ List streamConfigList1 = createDummyStreamConfigList(1,6);
+ when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1);
+ prepareForStaleDeletedStreamCleanupTests();
+
+ scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier());
+
+ Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
+
+ assertEquals(0, syncedStreams.size());
+ assertEquals(0, scheduler.staleStreamDeletionMap().size());
+ assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
+ }
+
+ //Creates list of upperBound-lowerBound no of dummy StreamConfig
+ private List createDummyStreamConfigList(int lowerBound, int upperBound) {
+ return IntStream.range(lowerBound, upperBound).mapToObj(this::createDummyStreamConfig)
+ .collect(Collectors.toCollection(LinkedList::new));
+ }
+ private StreamConfig createDummyStreamConfig(int streamId){
+ return new StreamConfig(
+ StreamIdentifier.multiStreamInstance(
+ Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
+ InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
+ }
+
@Test
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAfterDefermentPeriod()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
@@ -1116,7 +1194,7 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory,
- StreamConfig streamConfig) {
+ StreamConfig streamConfig, DeletedStreamListProvider deletedStreamListProvider) {
if(shouldReturnDefaultShardSyncTaskmanager) {
return shardSyncTaskManager;
}
diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
index c390987c7..02fccaad7 100644
--- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
+++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java
@@ -19,6 +19,7 @@
//
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
@@ -55,6 +56,7 @@
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
@@ -63,9 +65,12 @@
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
+import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
+import software.amazon.kinesis.leases.exceptions.InvalidStateException;
+import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.NullMetricsScope;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@@ -294,7 +299,7 @@ public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exceptio
final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
@@ -317,7 +322,8 @@ public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exceptio
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
- verify(shardDetector).listShards();
+ verify(shardDetector, never()).listShards();
+ verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@@ -329,7 +335,7 @@ public void testCheckAndCreateLeasesForShardsIfMissingAtLatestMultiStream() thro
final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
@@ -352,7 +358,8 @@ public void testCheckAndCreateLeasesForShardsIfMissingAtLatestMultiStream() thro
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
- verify(shardDetector).listShards();
+ verify(shardDetector, never()).listShards();
+ verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@@ -365,7 +372,7 @@ private List toMultiStreamLeaseList(List shardIdBasedLeases) {
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
- * should never be called.
+ * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
@@ -398,13 +405,14 @@ public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
+ verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
- * should never be called.
+ * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception {
@@ -435,13 +443,14 @@ public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws E
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
+ verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards()
- * should never be called.
+ * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception {
@@ -468,6 +477,7 @@ public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Excepti
assertThat(extendedSequenceNumbers.size(), equalTo(0));
verify(shardDetector, never()).listShards();
+ verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@@ -657,13 +667,13 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpen() throws Except
shards.remove(3);
shards.add(3, shard);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
- verify(shardDetector).listShards();
+ verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).listLeases();
}
}
@@ -677,14 +687,14 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenForMultiStream()
shards.remove(3);
shards.add(3, shard);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
setupMultiStream();
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
- verify(shardDetector).listShards();
+ verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).listLeases();
}
}
@@ -711,7 +721,7 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon
final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
@@ -732,7 +742,8 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon
leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
- verify(shardDetector).listShards();
+ verify(shardDetector, never()).listShards();
+ verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@@ -756,7 +767,7 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon
final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
@@ -777,7 +788,7 @@ public void testCheckAndCreateLeasesForNewShardsWhenParentIsOpenAndIgnoringIncon
leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
- verify(shardDetector).listShards();
+ verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@@ -811,7 +822,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe
final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
final ArgumentCaptor leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
@@ -826,7 +837,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe
assertThat(createLeases, equalTo(expectedCreateLeases));
- verify(shardDetector, times(1)).listShards();
+ verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@@ -841,7 +852,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShard(final ExtendedSe
assertThat(deleteLeases.size(), equalTo(0));
- verify(shardDetector, times(2)).listShards();
+ verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
}
@@ -877,7 +888,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc
final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases())
.thenThrow(new DependencyException(new Throwable("Throw for ListLeases")))
.thenReturn(Collections.emptyList()).thenReturn(leases);
@@ -889,7 +900,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
- verify(shardDetector, times(1)).listShards();
+ verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@@ -904,7 +915,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc
assertThat(createLeases, equalTo(expectedCreateLeases));
- verify(shardDetector, times(2)).listShards();
+ verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@@ -919,13 +930,36 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithListLeasesExc
final Set expectedSequenceNumbers = new HashSet<>(
Collections.singletonList(ExtendedSequenceNumber.SHARD_END));
- verify(shardDetector, times(3)).listShards();
+ verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
}
+ @Test
+ public void testDeletedStreamListProviderUpdateOnResourceNotFound()
+ throws ProvisionedThroughputException, InvalidStateException, DependencyException, InterruptedException {
+ DeletedStreamListProvider dummyDeletedStreamListProvider = new DeletedStreamListProvider();
+ hierarchicalShardSyncer = new HierarchicalShardSyncer(MULTISTREAM_MODE_ON, STREAM_IDENTIFIER,
+ dummyDeletedStreamListProvider);
+ when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenThrow(
+ ResourceNotFoundException.builder()
+ .build());
+ boolean response = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
+ INITIAL_POSITION_TRIM_HORIZON, SCOPE, ignoreUnexpectedChildShards,
+ dynamoDBLeaseRefresher.isLeaseTableEmpty());
+ Set deletedStreamSet = dummyDeletedStreamListProvider.purgeAllDeletedStream();
+
+ assertFalse(response);
+ assertThat(deletedStreamSet.size(), equalTo(1));
+ assertThat(deletedStreamSet.iterator().next().toString(), equalTo(STREAM_IDENTIFIER));
+
+ verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
+ verify(shardDetector, never()).listShards();
+ }
+
@Test(expected = DependencyException.class)
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithCreateLeaseExceptions()
throws Exception {
@@ -957,7 +991,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx
final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList())
.thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture()))
@@ -969,7 +1003,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
- verify(shardDetector, times(1)).listShards();
+ verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@@ -983,7 +1017,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx
final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position);
assertThat(createLeases, equalTo(expectedCreateLeases));
- verify(shardDetector, times(2)).listShards();
+ verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size()))
.createLeaseIfNotExists(any(Lease.class));
@@ -994,7 +1028,7 @@ private void testCheckAndCreateLeasesForNewShardsAndClosedShardWithCreateLeaseEx
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
- verify(shardDetector, times(3)).listShards();
+ verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size()))
.createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
@@ -1077,7 +1111,7 @@ private void testCheckAndCreateLeaseForShardsIfMissing(final List shards,
final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class);
- when(shardDetector.listShards()).thenReturn(shards);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(shardDetector.listShardsWithFilter(any())).thenReturn(getFilteredShards(shards, initialPosition));
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(existingLeases);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(existingLeases.isEmpty());
@@ -2296,14 +2330,14 @@ public void testNonEmptyLeaseTableUsesListShards() throws Exception {
final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
- when(shardDetector.listShards()).thenReturn(shardsWithoutLeases);
+ when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shardsWithoutLeases);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
- verify(shardDetector, atLeast(1)).listShards();
+ verify(shardDetector, atLeast(1)).listShardsWithoutConsumingResourceNotFoundException();
}
/**
diff --git a/pom.xml b/pom.xml
index 59feefef2..4bb86c1b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
amazon-kinesis-client-pom
pom
Amazon Kinesis Client Library
- 2.4.5
+ 2.4.6
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.