From ec34ed1def00c103e15bffd385ce60a868d0dd86 Mon Sep 17 00:00:00 2001 From: furq-aws <127275086+furq-aws@users.noreply.github.com> Date: Tue, 30 Apr 2024 14:19:58 -0700 Subject: [PATCH] Internally construct and use stream ARNs for all streams in multi-stream mode (#1318) --- .../amazon/kinesis/common/ArnUtil.java | 32 ++++ .../amazon/kinesis/common/StreamConfig.java | 4 + .../amazon/kinesis/coordinator/Scheduler.java | 69 +++++++- .../kinesis/coordinator/SchedulerTest.java | 153 ++++++++++++++---- 4 files changed, 228 insertions(+), 30 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ArnUtil.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ArnUtil.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ArnUtil.java new file mode 100644 index 000000000..920376980 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/ArnUtil.java @@ -0,0 +1,32 @@ +package software.amazon.kinesis.common; + +import lombok.NonNull; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; + +import static software.amazon.awssdk.services.kinesis.KinesisAsyncClient.SERVICE_NAME; + +@KinesisClientInternalApi +public final class ArnUtil { + private static final String STREAM_RESOURCE_PREFIX = "stream/"; + + /** + * Construct a Kinesis stream ARN. + * + * @param region The region the stream exists in. + * @param accountId The account the stream belongs to. + * @param streamName The name of the stream. + * @return The {@link Arn} of the Kinesis stream. + */ + public static Arn constructStreamArn( + @NonNull final Region region, @NonNull final String accountId, @NonNull final String streamName) { + return Arn.builder() + .partition(region.metadata().partition().id()) + .service(SERVICE_NAME) + .region(region.id()) + .accountId(accountId) + .resource(STREAM_RESOURCE_PREFIX + streamName) + .build(); + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java index 8ca75decd..95ab05607 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamConfig.java @@ -15,10 +15,14 @@ package software.amazon.kinesis.common; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; +@AllArgsConstructor +@RequiredArgsConstructor @Data @Accessors(fluent = true) public class StreamConfig { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index bb389ce94..f9ddc0ba5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -49,8 +49,11 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.utils.Validate; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; @@ -97,6 +100,7 @@ import software.amazon.kinesis.retrieval.RetrievalConfig; import software.amazon.kinesis.schemaregistry.SchemaRegistryDecoder; +import static software.amazon.kinesis.common.ArnUtil.constructStreamArn; import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType; import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy.StreamsLeasesDeletionType.FORMER_STREAMS_AUTO_DETECTION_DEFERRED_DELETION; @@ -153,7 +157,7 @@ public class Scheduler implements Runnable { private final long failoverTimeMillis; private final long taskBackoffTimeMillis; private final boolean isMultiStreamMode; - private final Map currentStreamConfigMap = new ConcurrentHashMap<>(); + private final Map currentStreamConfigMap = new StreamConfigMap(); private final StreamTracker streamTracker; private final FormerStreamsLeasesDeletionStrategy formerStreamsLeasesDeletionStrategy; private final long listShardsBackoffTimeMillis; @@ -961,7 +965,7 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, // to gracefully complete the reading. StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); if (streamConfig == null) { - streamConfig = streamTracker.createStreamConfig(streamIdentifier); + streamConfig = withStreamArn(streamTracker.createStreamConfig(streamIdentifier), getKinesisRegion()); log.info("Created orphan {}", streamConfig); } Validate.notNull(streamConfig, "StreamConfig should not be null"); @@ -1051,6 +1055,67 @@ private StreamIdentifier getStreamIdentifier(Optional streamIdentifierSt return streamIdentifier; } + private Region getKinesisRegion() { + return retrievalConfig.kinesisClient().serviceClientConfiguration().region(); + } + + /** + * Create and return a copy of a {@link StreamConfig} object + * with {@link StreamIdentifier#streamArnOptional()} populated. + * Only to be used in multi-stream mode. + * + * @param streamConfig The {@link StreamConfig} object to return a copy of. + * @param kinesisRegion The {@link Region} the stream exists in, to be used for constructing the {@link Arn}. + * @return A copy of the {@link StreamConfig} with {@link StreamIdentifier#streamArnOptional()} populated. + */ + private static StreamConfig withStreamArn( + @NonNull final StreamConfig streamConfig, @NonNull final Region kinesisRegion) { + Validate.isTrue(streamConfig.streamIdentifier().accountIdOptional().isPresent(), + "accountId should not be empty"); + Validate.isTrue(streamConfig.streamIdentifier().streamCreationEpochOptional().isPresent(), + "streamCreationEpoch should not be empty"); + + log.info("Constructing stream ARN for {} using the Kinesis client's configured region - {}.", + streamConfig.streamIdentifier(), kinesisRegion); + + final StreamIdentifier streamIdentifierWithArn = StreamIdentifier.multiStreamInstance( + constructStreamArn( + kinesisRegion, + streamConfig.streamIdentifier().accountIdOptional().get(), + streamConfig.streamIdentifier().streamName()), + streamConfig.streamIdentifier().streamCreationEpochOptional().get()); + + return new StreamConfig( + streamIdentifierWithArn, streamConfig.initialPositionInStreamExtended(), streamConfig.consumerArn()); + } + + @RequiredArgsConstructor + private class StreamConfigMap extends ConcurrentHashMap { + /** + * If {@link StreamIdentifier#streamArnOptional()} is present for the provided + * {@link StreamConfig#streamIdentifier()}, validates that the region in the stream ARN is consistent with the + * region that the Kinesis client ({@link RetrievalConfig#kinesisClient()}) is configured with. + *

+ * In multi-stream mode, ensures stream ARN is always present by constructing it using the Kinesis client + * region when {@link StreamIdentifier#streamArnOptional()} is {@link Optional#empty()}. + *

+ * {@inheritDoc} + */ + @Override + public StreamConfig put( + @NonNull final StreamIdentifier streamIdentifier, @NonNull final StreamConfig streamConfig) { + final Region kinesisRegion = getKinesisRegion(); + + return super.put(streamIdentifier, streamConfig.streamIdentifier().streamArnOptional() + .map(streamArn -> { + Validate.isTrue(kinesisRegion.id().equals(streamArn.region().get()), + "The provided streamARN " + streamArn + + " does not match the Kinesis client's configured region - " + kinesisRegion); + return streamConfig; + }).orElse(isMultiStreamMode ? withStreamArn(streamConfig, kinesisRegion) : streamConfig)); + } + } + /** * Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at * INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 9671bb78e..1820f2dd8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -46,11 +47,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.RejectedExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import com.google.common.base.Joiner; import com.google.common.collect.Sets; @@ -61,14 +64,19 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; import org.mockito.stubbing.OngoingStubbing; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisServiceClientConfiguration; +import software.amazon.awssdk.utils.StringUtils; import software.amazon.kinesis.checkpoint.Checkpoint; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.CheckpointFactory; @@ -129,6 +137,13 @@ public class SchedulerTest { private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; + private static final Region TEST_REGION = Region.US_EAST_2; + private static final int ACCOUNT_ID_LENGTH = 12; + private static final long TEST_ACCOUNT = Long.parseLong(StringUtils.repeat("1", ACCOUNT_ID_LENGTH)); + private static final long TEST_EPOCH = 1234567890L; + private static final String TEST_SHARD_ID = "shardId-000000000001"; + private static final InitialPositionInStreamExtended TEST_INITIAL_POSITION = + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); private Scheduler scheduler; private ShardRecordProcessorFactory shardRecordProcessorFactory; @@ -193,6 +208,8 @@ public void setup() { when(retrievalFactory.createGetRecordsCache(any(ShardInfo.class), any(StreamConfig.class), any(MetricsFactory.class))).thenReturn(recordsPublisher); when(shardDetector.streamIdentifier()).thenReturn(mock(StreamIdentifier.class)); + when(kinesisClient.serviceClientConfiguration()) + .thenReturn(KinesisServiceClientConfiguration.builder().region(TEST_REGION).build()); scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig); @@ -404,12 +421,12 @@ public final void testMultiStreamNoStreamsAreSyncedWhenStreamsAreNotRefreshed() throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); List streamConfigList2 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) @@ -428,12 +445,12 @@ public final void testMultiStreamOnlyNewStreamsAreSynced() throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); List streamConfigList2 = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) @@ -444,7 +461,7 @@ public final void testMultiStreamOnlyNewStreamsAreSynced() when(scheduler.shouldSyncStreamsNow()).thenReturn(true); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.range(5, 7).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(HashSet::new)); Assert.assertEquals(expectedSyncedStreams, syncedStreams); Assert.assertEquals(Sets.newHashSet(streamConfigList2), @@ -456,7 +473,7 @@ public final void testMultiStreamSyncFromTableDefaultInitPos() { // Streams in lease table but not tracked by multiStreamTracker List leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease() .streamIdentifier( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)) + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)) .shardId("some_random_shard_id")) .collect(Collectors.toCollection(LinkedList::new)); // Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later @@ -466,7 +483,7 @@ public final void testMultiStreamSyncFromTableDefaultInitPos() { // By default, Stream not present in multiStreamTracker will have initial position of LATEST List expectedConfig = IntStream.range(1, 3).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); // Include default configs @@ -489,7 +506,7 @@ public final void testMultiStreamSyncFromTableCustomInitPos() { // Streams in lease table but not tracked by multiStreamTracker List leasesInTable = IntStream.range(1, 3).mapToObj(streamId -> new MultiStreamLease() .streamIdentifier( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)) + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)) .shardId("some_random_shard_id")) .collect(Collectors.toCollection(LinkedList::new)); // Include a stream that is already tracked by multiStreamTracker, just to make sure we will not touch this stream config later @@ -499,7 +516,7 @@ public final void testMultiStreamSyncFromTableCustomInitPos() { // Stream not present in multiStreamTracker will have initial position specified by orphanedStreamInitialPositionInStream List expectedConfig = IntStream.range(1, 3).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPositionAtTimestamp(testTimeStamp))) .collect(Collectors.toCollection(LinkedList::new)); // Include default configs @@ -557,7 +574,7 @@ public final void testMultiStreamStaleStreamsAreNotDeletedImmediatelyProvidedLis when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(ArrayList::new)); } @@ -573,12 +590,12 @@ private void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectP throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); List streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) @@ -590,7 +607,7 @@ private void testMultiStreamStaleStreamsAreNotDeletedImmediately(boolean expectP when(scheduler.shouldSyncStreamsNow()).thenReturn(true); Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedPendingStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(HashSet::new)); Set expectedSyncedStreams = onlyStreamsDeletionNotLeases ? expectedPendingStreams : Sets.newHashSet(); Assert.assertEquals(expectedSyncedStreams, syncedStreams); @@ -625,7 +642,7 @@ public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithP }); HashSet currentStreamConfigMapOverride = IntStream.range(1, 5).mapToObj( streamId -> new StreamConfig(StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(HashSet::new)); testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(false, currentStreamConfigMapOverride); @@ -637,7 +654,7 @@ public final void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriodWithP when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new ProvidedStreamsDeferredDeletionStrategy() { @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(ArrayList::new)); } @@ -653,12 +670,12 @@ private void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean e throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); List streamConfigList2 = IntStream.range(3, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) @@ -671,7 +688,7 @@ private void testMultiStreamStaleStreamsAreDeletedAfterDefermentPeriod(boolean e Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.range(1, 3).mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))).collect( + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))).collect( Collectors.toCollection(HashSet::new)); Assert.assertEquals(expectSyncedStreams ? expectedSyncedStreams : Sets.newHashSet(), syncedStreams); Assert.assertEquals(currentStreamConfigMapOverride == null ? Sets.newHashSet(streamConfigList2) : currentStreamConfigMapOverride, @@ -720,7 +737,7 @@ public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeleted @Override public List streamIdentifiersForLeaseCleanup() { return IntStream.range(1, 3) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))) .collect(Collectors.toCollection(ArrayList::new)); } @@ -750,18 +767,18 @@ private void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmed Set expectedSyncedStreams; Set expectedPendingStreams = IntStream.range(1, 3) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))) .collect(Collectors.toCollection(HashSet::new)); if (onlyStreamsNoLeasesDeletion) { expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))) .collect(Collectors.toCollection(HashSet::new)); } else { expectedSyncedStreams = IntStream.range(5, 7) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))) .collect(Collectors.toCollection(HashSet::new)); } @@ -770,13 +787,13 @@ private void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmed if (onlyStreamsNoLeasesDeletion) { expectedCurrentStreamConfigs = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); } else { expectedCurrentStreamConfigs = IntStream.range(1, 7).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); } @@ -852,7 +869,7 @@ private List createDummyStreamConfigList(int lowerBound, int upper private StreamConfig createDummyStreamConfig(int streamId){ return new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); } @@ -861,12 +878,12 @@ public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAft throws DependencyException, ProvisionedThroughputException, InvalidStateException { List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); List streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345)), InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) .collect(Collectors.toCollection(LinkedList::new)); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) @@ -883,7 +900,7 @@ public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAft Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); Set expectedSyncedStreams = IntStream.concat(IntStream.range(1, 3), IntStream.range(5, 7)) .mapToObj(streamId -> StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345))) + Joiner.on(":").join(streamId * TEST_ACCOUNT, "multiStreamTest-" + streamId, streamId * 12345))) .collect(Collectors.toCollection(HashSet::new)); Assert.assertEquals(expectedSyncedStreams, syncedStreams); Assert.assertEquals(Sets.newHashSet(streamConfigList2), @@ -1080,6 +1097,86 @@ private void mockListLeases(List configs) throws ProvisionedThroug .shardId("some_random_shard_id")).collect(Collectors.toList())); } + @Test + public void testStreamConfigsArePopulatedWithStreamArnsInMultiStreamMode() { + final String streamArnStr = constructStreamArnStr(TEST_REGION, 111122223333L, "some-stream-name"); + when(multiStreamTracker.streamConfigList()).thenReturn(Stream.of( + // Each of scheduler's currentStreamConfigMap entries should have a streamARN in + // multi-stream mode, regardless of whether the streamTracker-provided streamIdentifiers + // were created using serialization or stream ARN. + StreamIdentifier.multiStreamInstance(constructStreamIdentifierSer(TEST_ACCOUNT, streamName)), + StreamIdentifier.multiStreamInstance(Arn.fromString(streamArnStr), TEST_EPOCH) + ) + .map(streamIdentifier -> new StreamConfig(streamIdentifier, TEST_INITIAL_POSITION)) + .collect(Collectors.toList())); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + + final Set expectedStreamArns = + Sets.newHashSet(constructStreamArnStr(TEST_REGION, TEST_ACCOUNT, streamName), streamArnStr); + + final Set actualStreamArns = scheduler.currentStreamConfigMap().values().stream() + .map(sc -> sc.streamIdentifier().streamArnOptional().orElseThrow(IllegalStateException::new).toString()) + .collect(Collectors.toSet()); + + assertEquals(expectedStreamArns, actualStreamArns); + } + + @Test + public void testOrphanStreamConfigIsPopulatedWithArn() { + final String streamIdentifierSerializationForOrphan = constructStreamIdentifierSer(TEST_ACCOUNT, streamName); + assertFalse(multiStreamTracker.streamConfigList().stream() + .map(sc -> sc.streamIdentifier().serialize()) + .collect(Collectors.toSet()) + .contains(streamIdentifierSerializationForOrphan)); + + when(leaseCoordinator.getCurrentAssignments()).thenReturn(Collections.singletonList( + new ShardInfo(TEST_SHARD_ID, null, null, null, streamIdentifierSerializationForOrphan))); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig); + + scheduler.runProcessLoop(); + + verify(multiStreamTracker).createStreamConfig( + StreamIdentifier.multiStreamInstance(streamIdentifierSerializationForOrphan)); + + final ArgumentCaptor streamConfigArgumentCaptor = ArgumentCaptor.forClass(StreamConfig.class); + verify(retrievalFactory).createGetRecordsCache(any(), streamConfigArgumentCaptor.capture(), any()); + + final StreamConfig actualStreamConfigForOrphan = streamConfigArgumentCaptor.getValue(); + final Optional streamArnForOrphan = actualStreamConfigForOrphan.streamIdentifier().streamArnOptional(); + assertTrue(streamArnForOrphan.isPresent()); + assertEquals(constructStreamArnStr(TEST_REGION, TEST_ACCOUNT, streamName), streamArnForOrphan.get().toString()); + } + + @Test + public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() { + final Region streamArnRegion = Region.US_WEST_1; + Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region()); + + when(multiStreamTracker.streamConfigList()).thenReturn(Collections.singletonList(new StreamConfig( + StreamIdentifier.multiStreamInstance( + Arn.fromString(constructStreamArnStr(streamArnRegion, TEST_ACCOUNT, streamName)), TEST_EPOCH), + TEST_INITIAL_POSITION))); + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + + assertThrows(IllegalArgumentException.class, () -> new Scheduler(checkpointConfig, coordinatorConfig, + leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig)); + } + + private static String constructStreamIdentifierSer(long accountId, String streamName) { + return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH)); + } + + private static String constructStreamArnStr(Region region, long accountId, String streamName) { + return "arn:aws:kinesis:" + region + ":" + accountId + ":stream/" + streamName; + } + /*private void runAndTestWorker(int numShards, int threadPoolSize) throws Exception { final int numberOfRecordsPerShard = 10; final String kinesisShardPrefix = "kinesis-0-";