Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up in-memory state of deleted kinesis stream in MultiStreamMode #1022

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

### Release 2.4.6 (January 13, 2023)
* [#1022](https://github.com/awslabs/amazon-kinesis-client/pull/1022) Clean up in-memory state of deleted kinesis stream in MultiStreamMode

### Release 2.4.5 (January 04, 2023)
* [#1014](https://github.com/awslabs/amazon-kinesis-client/pull/1014) Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ The recommended way to use the KCL for Java is to consume it from Maven.

## Release Notes

### Release 2.4.6 (January 13, 2023)
* [#1022](https://github.com/awslabs/amazon-kinesis-client/pull/1022) Clean up in-memory state of deleted kinesis stream in MultiStreamMode

### Release 2.4.5 (January 04, 2023)
* [#1014](https://github.com/awslabs/amazon-kinesis-client/pull/1014) Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request

Expand Down
2 changes: 1 addition & 1 deletion amazon-kinesis-client-multilang/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>amazon-kinesis-client-pom</artifactId>
<groupId>software.amazon.kinesis</groupId>
<version>2.4.5</version>
<version>2.4.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion amazon-kinesis-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId>
<version>2.4.5</version>
<version>2.4.6</version>
</parent>

<artifactId>amazon-kinesis-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package software.amazon.kinesis.coordinator;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import lombok.extern.slf4j.Slf4j;

import software.amazon.kinesis.common.StreamIdentifier;

/**
* This class is used for storing in-memory set of streams which are no longer existing (deleted) and needs to be
* cleaned up from KCL's in memory state.
*/
@Slf4j
public class DeletedStreamListProvider {

private final Set<StreamIdentifier> deletedStreams;

public DeletedStreamListProvider() {
deletedStreams = ConcurrentHashMap.newKeySet();
}

public void add(StreamIdentifier streamIdentifier) {
log.info("Added {}", streamIdentifier);
deletedStreams.add(streamIdentifier);
}

/**
* Method returns and empties the current set of streams
* @return list of deleted Streams
abhit17 marked this conversation as resolved.
Show resolved Hide resolved
*/
public Set<StreamIdentifier> purgeAllDeletedStream() {
final Set<StreamIdentifier> response = new HashSet<>(deletedStreams);
deletedStreams.removeAll(response);
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -54,7 +53,6 @@
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
Expand All @@ -75,7 +73,6 @@
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
Expand Down Expand Up @@ -121,6 +118,7 @@ public class Scheduler implements Runnable {
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";

private SchedulerLog slog = new SchedulerLog();

Expand Down Expand Up @@ -173,6 +171,8 @@ public class Scheduler implements Runnable {
private final LeaseCleanupManager leaseCleanupManager;
private final SchemaRegistryDecoder schemaRegistryDecoder;

private final DeletedStreamListProvider deletedStreamListProvider;

// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -217,6 +217,7 @@ protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig,
@NonNull final DiagnosticEventFactory diagnosticEventFactory) {
log.info("Scheduler invoked for version 2.4.6, V1");
this.checkpointConfig = checkpointConfig;
this.coordinatorConfig = coordinatorConfig;
this.leaseManagementConfig = leaseManagementConfig;
Expand Down Expand Up @@ -263,9 +264,10 @@ protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger();
this.deletedStreamListProvider = new DeletedStreamListProvider();
this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createShardSyncTaskManager(this.metricsFactory, streamConfig);
.createShardSyncTaskManager(this.metricsFactory, streamConfig, this.deletedStreamListProvider);
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
Expand Down Expand Up @@ -558,6 +560,14 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
// These are the streams which are deleted in Kinesis and we encounter resource not found during
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
// not have any data.
final Set<StreamIdentifier> deletedStreamSet = this.deletedStreamListProvider.purgeAllDeletedStream();
if (deletedStreamSet.size() > 0) {
log.info("Stale streams to delete: {}", deletedStreamSet);
staleStreamIdsToBeDeleted.addAll(deletedStreamSet);
}
final Set<StreamIdentifier> deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
streamsSynced.addAll(deletedStreamsLeases);

Expand All @@ -577,6 +587,8 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(),
MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, NON_EXISTING_STREAM_DELETE_COUNT, deletedStreamSet.size(),
MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY);
} finally {
MetricsUtil.endScope(metricsScope);
Expand Down Expand Up @@ -614,6 +626,7 @@ private void removeStreamsFromStaleStreamsList(Set<StreamIdentifier> streamIdent

private Set<StreamIdentifier> deleteMultiStreamLeases(Set<StreamIdentifier> streamIdentifiers)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
log.info("Deleting streams: {}", streamIdentifiers);
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
List<MultiStreamLease> leases = null;
Map<String, List<MultiStreamLease>> streamIdToShardsMap = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -39,6 +40,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
Expand All @@ -47,6 +49,7 @@
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
Expand All @@ -56,6 +59,7 @@
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import static java.util.Objects.nonNull;
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;

/**
Expand All @@ -72,20 +76,26 @@ public class HierarchicalShardSyncer {

private final String streamIdentifier;

private final DeletedStreamListProvider deletedStreamListProvider;

private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
private static final int retriesForCompleteHashRange = 3;

private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;

public HierarchicalShardSyncer() {
isMultiStreamMode = false;
streamIdentifier = "SingleStreamMode";
this(false, "SingleStreamMode");
}

public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
this(isMultiStreamMode, streamIdentifier, null);
}

public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) {
this.isMultiStreamMode = isMultiStreamMode;
this.streamIdentifier = streamIdentifier;
this.deletedStreamListProvider = deletedStreamListProvider;
}

private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
Expand Down Expand Up @@ -306,8 +316,17 @@ private static List<Shard> getShardListAtInitialPosition(@NonNull final ShardDet
+ retriesForCompleteHashRange + " retries.");
}

private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShards());
private List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
// Fallback to existing behavior for backward compatibility
List<Shard> shardList = Collections.emptyList();
try {
shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException();
} catch (ResourceNotFoundException e) {
if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) {
deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier));
}
}
final Optional<List<Shard>> shards = Optional.of(shardList);

return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class KinesisShardDetector implements ShardDetector {
@Getter(AccessLevel.PACKAGE)
private AtomicInteger cacheMisses = new AtomicInteger(0);

private static final Boolean THROW_RESOURCE_NOT_FOUND_EXCEPTION = true;

@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
Expand Down Expand Up @@ -159,15 +161,26 @@ public List<Shard> listShards() {
return listShardsWithFilter(null);
}

@Override
@Synchronized
public List<Shard> listShardsWithoutConsumingResourceNotFoundException() {
return listShardsWithFilterInternal(null, THROW_RESOURCE_NOT_FOUND_EXCEPTION);
}

@Override
@Synchronized
public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
return listShardsWithFilterInternal(shardFilter, !THROW_RESOURCE_NOT_FOUND_EXCEPTION);
}

private List<Shard> listShardsWithFilterInternal(ShardFilter shardFilter,
boolean shouldPropagateResourceNotFoundException) {
final List<Shard> shards = new ArrayList<>();
ListShardsResponse result;
String nextToken = null;

do {
result = listShards(shardFilter, nextToken);
result = listShards(shardFilter, nextToken, shouldPropagateResourceNotFoundException);

if (result == null) {
/*
Expand All @@ -185,7 +198,12 @@ public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
return shards;
}

private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
/**
* @param shouldPropagateResourceNotFoundException : used to determine if ResourceNotFoundException should be
* handled by method and return Empty list or propagate the exception.
*/
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken,
final boolean shouldPropagateResourceNotFoundException) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(LimitExceededException.class, t -> t);
Expand Down Expand Up @@ -233,9 +251,14 @@ private ListShardsResponse listShards(ShardFilter shardFilter, final String next
} catch (ResourceNotFoundException e) {
log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.",
streamIdentifier.streamName());
return ListShardsResponse.builder().shards(Collections.emptyList())
.nextToken(null)
.build();
if (shouldPropagateResourceNotFoundException) {
throw e;
}
return ListShardsResponse.builder()
.shards(Collections.emptyList())
.nextToken(null)
.build();

} catch (TimeoutException te) {
throw new RuntimeException(te);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.kinesis.leases;

import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.metrics.MetricsFactory;

Expand All @@ -31,6 +32,11 @@ default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFa
throw new UnsupportedOperationException();
}

default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig,
DeletedStreamListProvider deletedStreamListProvider) {
throw new UnsupportedOperationException("createShardSyncTaskManager method not implemented");
}

DynamoDBLeaseRefresher createLeaseRefresher();

ShardDetector createShardDetector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public interface ShardDetector {
*/
List<Shard> listShards();

/**
* This method behaves exactly similar to listShards except the fact that this does not consume and throw
* ResourceNotFoundException instead of returning empty list.
*
* @return Shards
*/
default List<Shard> listShardsWithoutConsumingResourceNotFoundException() {
throw new UnsupportedOperationException("listShardsWithoutConsumingResourceNotFoundException not implemented");
}

/**
* List shards with shard filter.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCleanupManager;
Expand Down Expand Up @@ -515,6 +516,29 @@ public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFac
metricsFactory);
}

/**
* Create ShardSyncTaskManager from the streamConfig passed
*
* @param metricsFactory - factory to get metrics object
* @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created
abhit17 marked this conversation as resolved.
Show resolved Hide resolved
* @return ShardSyncTaskManager
*/
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig,
abhit17 marked this conversation as resolved.
Show resolved Hide resolved
DeletedStreamListProvider deletedStreamListProvider) {
return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
this.createLeaseRefresher(),
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString(),
deletedStreamListProvider),
metricsFactory);
}


@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,
Expand Down
Loading