diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java index 7fb160730..47990226c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java @@ -45,24 +45,24 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie private static final int TIME_TO_KEEP_ALIVE = 5; private static final int CORE_THREAD_POOL_COUNT = 1; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final ExecutorService executorService; private final int retryGetRecordsInSeconds; private final String shardId; final Supplier> completionServiceSupplier; - public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher, + public AsynchronousGetRecordsRetrievalStrategy(@NonNull final IDataFetcher dataFetcher, final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) { this(dataFetcher, buildExector(maxGetRecordsThreadPool, shardId), retryGetRecordsInSeconds, shardId); } - public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher, + public AsynchronousGetRecordsRetrievalStrategy(final IDataFetcher dataFetcher, final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) { this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService), shardId); } - AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, + AsynchronousGetRecordsRetrievalStrategy(IDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, Supplier> completionServiceSupplier, String shardId) { this.dataFetcher = dataFetcher; @@ -148,7 +148,7 @@ private static ExecutorService buildExector(int maxGetRecordsThreadPool, String } @Override - public KinesisDataFetcher getDataFetcher() { + public IDataFetcher getDataFetcher() { return dataFetcher; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java index 4f4c49b81..2f001b2e9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java @@ -30,7 +30,7 @@ * If we don't find a checkpoint for the parent shard(s), we assume they have been trimmed and directly * proceed with processing data from the shard. */ -class BlockOnParentShardTask implements ITask { +public class BlockOnParentShardTask implements ITask { private static final Log LOG = LogFactory.getLog(BlockOnParentShardTask.class); private final ShardInfo shardInfo; @@ -45,7 +45,7 @@ class BlockOnParentShardTask implements ITask { * @param leaseManager Used to fetch the lease and checkpoint info for parent shards * @param parentShardPollIntervalMillis Sleep time if the parent shard has not completed processing */ - BlockOnParentShardTask(ShardInfo shardInfo, + public BlockOnParentShardTask(ShardInfo shardInfo, ILeaseManager leaseManager, long parentShardPollIntervalMillis) { this.shardInfo = shardInfo; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java index 71a153408..d831a4f1c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java @@ -46,9 +46,9 @@ public interface GetRecordsRetrievalStrategy { boolean isShutdown(); /** - * Returns the KinesisDataFetcher used to getRecords from Kinesis. + * Returns the IDataFetcher used to getRecords * - * @return KinesisDataFetcher + * @return IDataFetcher */ - KinesisDataFetcher getDataFetcher(); + IDataFetcher getDataFetcher(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java new file mode 100644 index 000000000..29f088d15 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java @@ -0,0 +1,23 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.model.ChildShard; + +import java.util.List; + +public interface IDataFetcher { + + DataFetcherResult getRecords(int maxRecords); + + void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream); + + void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream); + + void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream); + + void restartIterator(); + + boolean isShardEndReached(); + + List getChildShards(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java new file mode 100644 index 000000000..ac2e31d1f --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java @@ -0,0 +1,25 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +public interface IShardConsumer { + + boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist(); + + enum TaskOutcome { + SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND + } + + boolean consumeShard(); + + boolean isShutdown(); + + ShutdownReason getShutdownReason(); + + boolean beginShutdown(); + + void notifyShutdownRequested(ShutdownNotification shutdownNotification); + + KinesisConsumerStates.ShardConsumerState getCurrentState(); + + boolean isShutdownRequested(); + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java new file mode 100644 index 000000000..acb23018e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java @@ -0,0 +1,34 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +public interface IShardConsumerFactory { + + /** + * Returns a shard consumer to be used for consuming a (assigned) shard. + * + * @return Returns a shard consumer object. + */ + IShardConsumer createShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpointTracker, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesUponShardCompletion, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long taskBackoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager); +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 5343470f8..0b4fa6510 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -29,7 +29,7 @@ /** * Task for initializing shard position and invoking the RecordProcessor initialize() API. */ -class InitializeTask implements ITask { +public class InitializeTask implements ITask { private static final Log LOG = LogFactory.getLog(InitializeTask.class); @@ -37,7 +37,7 @@ class InitializeTask implements ITask { private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final TaskType taskType = TaskType.INITIALIZE; private final ICheckpoint checkpoint; private final RecordProcessorCheckpointer recordProcessorCheckpointer; @@ -49,11 +49,11 @@ class InitializeTask implements ITask { /** * Constructor. */ - InitializeTask(ShardInfo shardInfo, + public InitializeTask(ShardInfo shardInfo, IRecordProcessor recordProcessor, ICheckpoint checkpoint, RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisDataFetcher dataFetcher, + IDataFetcher dataFetcher, long backoffTimeMillis, StreamConfig streamConfig, GetRecordsCache getRecordsCache) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index ccde83f35..7aa2f885b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -61,7 +61,7 @@ public class KinesisClientLibConfiguration { public static final int DEFAULT_MAX_RECORDS = 10000; /** - * The default value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to + * The default value for how long the {@link KinesisShardConsumer} should sleep if no records are returned from the call to * {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}. */ public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L; @@ -91,7 +91,7 @@ public class KinesisClientLibConfiguration { public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true; /** - * Interval to run lease cleanup thread in {@link LeaseCleanupManager}. + * Interval to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}. */ private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis(); @@ -1030,7 +1030,7 @@ public int getInitialLeaseTableWriteCapacity() { * Keeping it protected to forbid outside callers from depending on this internal object. * @return The initialPositionInStreamExtended object. */ - protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() { + public InitialPositionInStreamExtended getInitialPositionInStreamExtended() { return initialPositionInStreamExtended; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 47baad044..ad55ab523 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -51,7 +51,7 @@ /** * This class is used to coordinate/manage leases owned by this worker process and to get/set checkpoints. */ -class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { +public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { private static final Log LOG = LogFactory.getLog(KinesisClientLibLeaseCoordinator.class); @@ -368,7 +368,7 @@ void runLeaseRenewer() throws DependencyException, InvalidStateException { * * @return LeaseManager */ - ILeaseManager getLeaseManager() { + public ILeaseManager getLeaseManager() { return leaseManager; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java similarity index 90% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java index 5cf55dbfe..766d6fbae 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java @@ -15,7 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** - * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, + * Top level container for all the possible states a {@link KinesisShardConsumer} can be in. The logic for creation of tasks, * and state transitions is contained within the {@link ConsumerState} objects. * *

State Diagram

@@ -64,12 +64,12 @@ * +-------------------+ * */ -class ConsumerStates { +public class KinesisConsumerStates { /** * Enumerates processing states when working on a shard. */ - enum ShardConsumerState { + public enum ShardConsumerState { // @formatter:off WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()), INITIALIZING(new InitializingState()), @@ -96,7 +96,7 @@ public ConsumerState getConsumerState() { * do when a transition occurs. * */ - interface ConsumerState { + public interface ConsumerState { /** * Creates a new task for this state using the passed in consumer to build the task. If there is no task * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the @@ -106,11 +106,11 @@ interface ConsumerState { * the consumer to use build the task, or execute state. * @return a valid task for this state or null if there is no task required. */ - ITask createTask(ShardConsumer consumer); + ITask createTask(KinesisShardConsumer consumer); /** * Provides the next state of the consumer upon success of the task return by - * {@link ConsumerState#createTask(ShardConsumer)}. + * {@link ConsumerState#createTask(KinesisShardConsumer)}. * * @return the next state that the consumer should transition to, this may be the same object as the current * state. @@ -129,7 +129,7 @@ interface ConsumerState { ConsumerState shutdownTransition(ShutdownReason shutdownReason); /** - * The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state + * The type of task that {@link ConsumerState#createTask(KinesisShardConsumer)} would return. This is always a valid state * even if createTask would return a null value. * * @return the type of task that this state represents. @@ -149,7 +149,7 @@ interface ConsumerState { } /** - * The initial state that any {@link ShardConsumer} should start in. + * The initial state that any {@link KinesisShardConsumer} should start in. */ static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); @@ -187,7 +187,7 @@ private static ConsumerState shutdownStateFor(ShutdownReason reason) { static class BlockedOnParentState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(), consumer.getParentShardPollIntervalMillis()); } @@ -247,10 +247,10 @@ public boolean isTerminal() { * * */ - static class InitializingState implements ConsumerState { + public static class InitializingState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), @@ -311,7 +311,7 @@ public boolean isTerminal() { static class ProcessingState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), @@ -358,10 +358,10 @@ public boolean isTerminal() { *

Valid Transitions

*
*
Success
- *
Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
+ *
Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
*
Shutdown
*
At this point records are being retrieved, and processed. An explicit shutdown will allow the record - * processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state. + * processor one last chance to checkpoint, and then the {@link KinesisShardConsumer} will be held in an idle state. *
*
{@link ShutdownReason#REQUESTED}
*
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to @@ -377,7 +377,7 @@ public boolean isTerminal() { static class ShutdownNotificationState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), consumer.getShutdownNotification(), @@ -414,24 +414,24 @@ public boolean isTerminal() { } /** - * Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the - * processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state. + * Once the {@link ShutdownNotificationState} has been completed the {@link KinesisShardConsumer} must not re-enter any of the + * processing states. This state idles the {@link KinesisShardConsumer} until the worker triggers the final shutdown state. * *

Valid Transitions

*
*
Success
*
*

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. *

*

* Remains in the {@link ShutdownNotificationCompletionState} *

*
*
Shutdown
- *
At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is + *
At this point the {@link KinesisShardConsumer} has notified the record processor of the impending shutdown, and is * waiting that notification. While waiting for the notification no further processing should occur on the - * {@link ShardConsumer}. + * {@link KinesisShardConsumer}. *
*
{@link ShutdownReason#REQUESTED}
*
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains @@ -447,7 +447,7 @@ public boolean isTerminal() { static class ShutdownNotificationCompletionState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return null; } @@ -481,14 +481,14 @@ public boolean isTerminal() { } /** - * This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard. + * This state is entered if the {@link KinesisShardConsumer} loses its lease, or reaches the end of the shard. * *

Valid Transitions

*
*
Success
*
*

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. *

*

* Transitions to the {@link ShutdownCompleteState} @@ -497,7 +497,7 @@ public boolean isTerminal() { *

Shutdown
*
At this point the record processor has processed the final shutdown indication, and depending on the shutdown * reason taken the correct course of action. From this point on there should be no more interactions with the - * record processor or {@link ShardConsumer}. + * record processor or {@link KinesisShardConsumer}. *
*
{@link ShutdownReason#REQUESTED}
*
@@ -519,8 +519,8 @@ public boolean isTerminal() { static class ShuttingDownState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { - return new ShutdownTask(consumer.getShardInfo(), + public ITask createTask(KinesisShardConsumer consumer) { + return new KinesisShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), @@ -562,21 +562,21 @@ public boolean isTerminal() { } /** - * This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed. + * This is the final state for the {@link KinesisShardConsumer}. This occurs once all shutdown activities are completed. * *

Valid Transitions

*
*
Success
*
*

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. *

*

* Remains in the {@link ShutdownCompleteState} *

*
*
Shutdown
- *
At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any + *
At this point the all shutdown activites are completed, and the {@link KinesisShardConsumer} should not take any * further actions. *
*
{@link ShutdownReason#REQUESTED}
@@ -599,7 +599,7 @@ public boolean isTerminal() { static class ShutdownCompleteState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { if (consumer.getShutdownNotification() != null) { consumer.getShutdownNotification().shutdownComplete(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index ae4e321d4..4c8d638b9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -39,7 +39,7 @@ /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ -class KinesisDataFetcher { +public class KinesisDataFetcher implements IDataFetcher{ private static final Log LOG = LogFactory.getLog(KinesisDataFetcher.class); @@ -185,7 +185,7 @@ public void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPosition * @param sequenceNumber advance the iterator to the record at this sequence number. * @param initialPositionInStream The initialPositionInStream. */ - void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { + public void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { if (sequenceNumber == null) { throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId); } else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) { @@ -276,11 +276,11 @@ public void restartIterator() { /** * @return the shardEndReached */ - protected boolean isShardEndReached() { + public boolean isShardEndReached() { return isShardEndReached; } - protected List getChildShards() { + public List getChildShards() { return childShards; } @@ -290,5 +290,4 @@ protected List getChildShards() { String getNextIterator() { return nextIterator; } - } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java similarity index 92% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java index 11ee39b5d..b8e227c43 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java @@ -14,7 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; - import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -35,7 +34,6 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.google.common.annotations.VisibleForTesting; - import lombok.Getter; /** @@ -43,9 +41,9 @@ * The instance should be shutdown when we lose the primary responsibility for a shard. * A new instance should be created if the primary responsibility is reassigned back to this process. */ -class ShardConsumer { +public class KinesisShardConsumer implements IShardConsumer{ - private static final Log LOG = LogFactory.getLog(ShardConsumer.class); + private static final Log LOG = LogFactory.getLog(KinesisShardConsumer.class); private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; @@ -78,7 +76,7 @@ class ShardConsumer { @Getter private final GetRecordsCache getRecordsCache; - private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, + private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, ShardInfo shardInfo) { @@ -93,7 +91,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do * much coordination/synchronization to handle concurrent reads/updates. */ - private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE; + private KinesisConsumerStates.ConsumerState currentState = KinesisConsumerStates.INITIAL_STATE; /* * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. @@ -116,7 +114,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @Deprecated - ShardConsumer(ShardInfo shardInfo, + KinesisShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, @@ -162,7 +160,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @Deprecated - ShardConsumer(ShardInfo shardInfo, + KinesisShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, @@ -223,7 +221,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher * @param shardSyncer shardSyncer instance used to check and create new leases */ @Deprecated - ShardConsumer(ShardInfo shardInfo, + KinesisShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, IRecordProcessor recordProcessor, @@ -269,23 +267,23 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher * @param shardSyncer shardSyncer instance used to check and create new leases * @param leaseCleanupManager used to clean up leases in lease table. */ - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisDataFetcher kinesisDataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, - LeaseCleanupManager leaseCleanupManager) { + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.streamConfig = streamConfig; this.checkpoint = checkpoint; @@ -314,7 +312,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher * * @return true if a new process task was submitted, false otherwise */ - synchronized boolean consumeShard() { + public synchronized boolean consumeShard() { return checkAndSubmitNextTask(); } @@ -373,10 +371,6 @@ public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() { return skipShardSyncAtWorkerInitializationIfLeasesExist; } - private enum TaskOutcome { - SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND - } - private TaskOutcome determineTaskOutcome() { try { TaskResult result = future.get(); @@ -423,7 +417,7 @@ private void logTaskException(TaskResult taskResult) { * * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. */ - void notifyShutdownRequested(ShutdownNotification shutdownNotification) { + public void notifyShutdownRequested(ShutdownNotification shutdownNotification) { this.shutdownNotification = shutdownNotification; markForShutdown(ShutdownReason.REQUESTED); } @@ -434,7 +428,7 @@ void notifyShutdownRequested(ShutdownNotification shutdownNotification) { * * @return true if shutdown is complete (false if shutdown is still in progress) */ - synchronized boolean beginShutdown() { + public synchronized boolean beginShutdown() { markForShutdown(ShutdownReason.ZOMBIE); checkAndSubmitNextTask(); @@ -454,14 +448,14 @@ synchronized void markForShutdown(ShutdownReason reason) { * * @return true if shutdown is complete */ - boolean isShutdown() { + public boolean isShutdown() { return currentState.isTerminal(); } /** * @return the shutdownReason */ - ShutdownReason getShutdownReason() { + public ShutdownReason getShutdownReason() { return shutdownReason; } @@ -497,7 +491,7 @@ void updateState(TaskOutcome taskOutcome) { } if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); - } else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { + } else if (isShutdownRequested() && KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { @@ -516,7 +510,7 @@ void updateState(TaskOutcome taskOutcome) { } @VisibleForTesting - boolean isShutdownRequested() { + public boolean isShutdownRequested() { return shutdownReason != null; } @@ -525,7 +519,7 @@ boolean isShutdownRequested() { * * @return the currentState */ - ConsumerStates.ShardConsumerState getCurrentState() { + public KinesisConsumerStates.ShardConsumerState getCurrentState() { return currentState.getState(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java new file mode 100644 index 000000000..bbcae852c --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java @@ -0,0 +1,48 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +public class KinesisShardConsumerFactory implements IShardConsumerFactory{ + + @Override + public IShardConsumer createShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpointTracker, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesUponShardCompletion, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long taskBackoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager) { + return new KinesisShardConsumer(shardInfo, + streamConfig, + checkpointTracker, + recordProcessor, + recordProcessorCheckpointer, + leaseCoordinator, + parentShardPollIntervalMillis, + cleanupLeasesUponShardCompletion, + executorService, + metricsFactory, + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config, shardSyncer, shardSyncStrategy, + leaseCleanupManager); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java similarity index 99% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java index 07c9fff23..a444f26fe 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java @@ -14,26 +14,25 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; +import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; import com.amazonaws.services.kinesis.leases.exceptions.CustomerApplicationException; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.UpdateField; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; -import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; -import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; -import com.google.common.annotations.VisibleForTesting; - import java.util.List; import java.util.Objects; import java.util.Optional; @@ -44,9 +43,9 @@ /** * Task for invoking the RecordProcessor shutdown() callback. */ -class ShutdownTask implements ITask { +public class KinesisShutdownTask implements ITask { - private static final Log LOG = LogFactory.getLog(ShutdownTask.class); + private static final Log LOG = LogFactory.getLog(KinesisShutdownTask.class); @VisibleForTesting static final int RETRY_RANDOM_MAX_RANGE = 50; @@ -72,7 +71,7 @@ class ShutdownTask implements ITask { * Constructor. */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - ShutdownTask(ShardInfo shardInfo, + KinesisShutdownTask(ShardInfo shardInfo, IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownReason reason, diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java index cdf73e824..35b07b9dc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java @@ -407,4 +407,4 @@ public int compare(KinesisClientLease lease, KinesisClientLease otherLease) { .result(); } } -} +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index a4cf74d88..3e4dbcb76 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -60,7 +60,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private PrefetchCounters prefetchCounters; private boolean started = false; private final String operation; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final String shardId; /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index cd543e23d..615874501 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -41,7 +41,7 @@ /** * Task for fetching data records and invoking processRecords() on the record processor instance. */ -class ProcessTask implements ITask { +public class ProcessTask implements ITask { private static final Log LOG = LogFactory.getLog(ProcessTask.class); @@ -55,7 +55,7 @@ class ProcessTask implements ITask { private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; private final RecordProcessorCheckpointer recordProcessorCheckpointer; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final TaskType taskType = TaskType.PROCESS; private final StreamConfig streamConfig; private final long backoffTimeMillis; @@ -81,7 +81,7 @@ class ProcessTask implements ITask { * The retrieval strategy for fetching records from kinesis */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + RecordProcessorCheckpointer recordProcessorCheckpointer, IDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, GetRecordsCache getRecordsCache) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, @@ -107,7 +107,7 @@ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProces * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + RecordProcessorCheckpointer recordProcessorCheckpointer, IDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { super(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index dbee92189..49c1b0e56 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -37,7 +37,7 @@ * The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application * RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment. */ -class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { +public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { private static final Log LOG = LogFactory.getLog(RecordProcessorCheckpointer.class); @@ -62,7 +62,7 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * @param checkpoint Used to checkpoint progress of a RecordProcessor * @param validator Used for validating sequence numbers */ - RecordProcessorCheckpointer(ShardInfo shardInfo, + public RecordProcessorCheckpointer(ShardInfo shardInfo, ICheckpoint checkpoint, SequenceNumberValidator validator, IMetricsFactory metricsFactory) { @@ -231,7 +231,7 @@ public synchronized IPreparedCheckpointer prepareCheckpoint(String sequenceNumbe /** * @return the lastCheckpointValue */ - ExtendedSequenceNumber getLastCheckpointValue() { + public ExtendedSequenceNumber getLastCheckpointValue() { return lastCheckpointValue; } @@ -244,14 +244,14 @@ synchronized void setInitialCheckpointValue(ExtendedSequenceNumber initialCheckp * * @return the largest permitted checkpoint */ - synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { + public synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { return largestPermittedCheckpointValue; } /** * @param largestPermittedCheckpointValue the largest permitted checkpoint */ - synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { + public synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { this.largestPermittedCheckpointValue = largestPermittedCheckpointValue; } @@ -262,7 +262,7 @@ synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber larg * * @param extendedSequenceNumber */ - synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { + public synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { this.sequenceNumberAtShardEnd = extendedSequenceNumber; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java index 3ca28235c..ac81dc8b2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java @@ -51,7 +51,7 @@ public class SequenceNumberValidator { * @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers * being validated */ - SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { + public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { this.proxy = proxy; this.shardId = shardId; this.validateWithGetIterator = validateWithGetIterator; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java index ecf5041f9..98e4702e5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java @@ -21,14 +21,14 @@ /** * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. */ -class ShutdownNotificationTask implements ITask { +public class ShutdownNotificationTask implements ITask { private final IRecordProcessor recordProcessor; private final IRecordProcessorCheckpointer recordProcessorCheckpointer; private final ShutdownNotification shutdownNotification; private final ShardInfo shardInfo; - ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) { + public ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) { this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.shutdownNotification = shutdownNotification; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java index 5e29d6ddb..c326e361d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java @@ -15,8 +15,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ShardConsumerState; /** @@ -72,7 +72,7 @@ public boolean canTransitionTo(ShutdownReason reason) { return reason.rank > this.rank; } - ConsumerState getShutdownState() { + public ConsumerState getShutdownState() { return shutdownState; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java index fff6b71f6..7ef5be9c8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java @@ -19,7 +19,7 @@ /** * Used to capture stream configuration and pass it along. */ -class StreamConfig { +public class StreamConfig { private final IKinesisProxy streamProxy; private final int maxRecords; @@ -54,7 +54,7 @@ class StreamConfig { /** * @return the streamProxy */ - IKinesisProxy getStreamProxy() { + public IKinesisProxy getStreamProxy() { return streamProxy; } @@ -82,14 +82,14 @@ boolean shouldCallProcessRecordsEvenForEmptyRecordList() { /** * @return the initialPositionInStream */ - InitialPositionInStreamExtended getInitialPositionInStream() { + public InitialPositionInStreamExtended getInitialPositionInStream() { return initialPositionInStream; } /** * @return validateSequenceNumberBeforeCheckpointing */ - boolean shouldValidateSequenceNumberBeforeCheckpointing() { + public boolean shouldValidateSequenceNumberBeforeCheckpointing() { return validateSequenceNumberBeforeCheckpointing; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java index 5da33ac69..ac46be339 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java @@ -24,7 +24,7 @@ @Data public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { @NonNull - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; @Override public GetRecordsResult getRecords(final int maxRecords) { @@ -44,7 +44,7 @@ public boolean isShutdown() { } @Override - public KinesisDataFetcher getDataFetcher() { + public IDataFetcher getDataFetcher() { return dataFetcher; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index a69ea6ca1..33faa9963 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -137,7 +137,7 @@ public class Worker implements Runnable { // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. - private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); + private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -160,6 +160,9 @@ public class Worker implements Runnable { private final LeaseCleanupManager leaseCleanupManager; + // Shard Consumer Factory + private IShardConsumerFactory shardConsumerFactory; + /** * Constructor. * @@ -539,7 +542,7 @@ config, getStreamConfig(config, kinesisClient), leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), - leaderDecider, periodicShardSyncManager); + leaderDecider, periodicShardSyncManager, null /*Ishardconsumer*/); } Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, @@ -550,7 +553,7 @@ maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(lease boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, - PeriodicShardSyncManager periodicShardSyncManager) { + PeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -580,6 +583,7 @@ maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(lease Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()); + this.shardConsumerFactory = shardConsumerFactory; } /** @@ -687,7 +691,7 @@ void runProcessLoop() { boolean foundCompletedShard = false; Set assignedShards = new HashSet<>(); for (ShardInfo shardInfo : getShardInfoForAssignments()) { - ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); + IShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) { foundCompletedShard = true; } else { @@ -983,9 +987,9 @@ Callable createWorkerShutdownCallable() { ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, notificationCompleteLatch, shutdownCompleteLatch); ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + IShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); - if (consumer == null || ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) { + if (consumer == null || KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) { // // CASE1: There is a race condition between retrieving the current assignments, and creating the // notification. If the a lease is lost in between these two points, we explicitly decrement the @@ -1007,7 +1011,7 @@ boolean isShutdownComplete() { return shutdownComplete; } - ConcurrentMap getShardInfoShardConsumerMap() { + ConcurrentMap getShardInfoShardConsumerMap() { return shardInfoShardConsumerMap; } @@ -1107,8 +1111,8 @@ boolean shouldShutdown() { * RecordProcessor factory * @return ShardConsumer for the shard */ - ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + IShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + IShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier // lease instance (and was shutdown). Don't need to create another @@ -1123,7 +1127,7 @@ ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFact return consumer; } - protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + protected IShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { final IRecordProcessor recordProcessor = processorFactory.createProcessor(); final RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, @@ -1134,7 +1138,12 @@ protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFacto streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), metricsFactory); - return new ShardConsumer(shardInfo, + if(shardConsumerFactory == null){ + + shardConsumerFactory = new KinesisShardConsumerFactory(); + } + + return shardConsumerFactory.createShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, @@ -1146,7 +1155,6 @@ protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFacto metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), retryGetRecordsInSeconds, maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, @@ -1224,8 +1232,8 @@ StreamConfig getStreamConfig() { * KinesisClientLibConfiguration * @return Returns metrics factory based on the config. */ - private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, - KinesisClientLibConfiguration config) { + public static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, + KinesisClientLibConfiguration config) { IMetricsFactory metricsFactory; if (config.getMetricsLevel() == MetricsLevel.NONE) { metricsFactory = new NullMetricsFactory(); @@ -1353,6 +1361,8 @@ public static class Builder { @Setter @Accessors(fluent = true) private IKinesisProxy kinesisProxy; @Setter @Accessors(fluent = true) + private IShardConsumerFactory shardConsumerFactory; + @Setter @Accessors(fluent = true) private WorkerStateChangeListener workerStateChangeListener; @Setter @Accessors(fluent = true) private LeaseCleanupValidator leaseCleanupValidator; @@ -1421,6 +1431,10 @@ public Worker build() { throw new IllegalArgumentException( "Kinesis Client Library configuration needs to be provided to build Worker"); } + if(shardConsumerFactory == null){ + shardConsumerFactory = new KinesisShardConsumerFactory(); + } + if (recordProcessorFactory == null) { throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker"); } @@ -1503,7 +1517,7 @@ public Worker build() { } // We expect users to either inject both LeaseRenewer and the corresponding thread-pool, or neither of them (DEFAULT). - if (leaseRenewer == null) { + if (leaseRenewer == null) { ExecutorService leaseRenewerThreadPool = LeaseCoordinator.getDefaultLeaseRenewalExecutorService(config.getMaxLeaseRenewalThreads()); leaseRenewer = new LeaseRenewer<>(leaseManager, config.getWorkerIdentifier(), config.getFailoverTimeMillis(), leaseRenewerThreadPool); } @@ -1512,7 +1526,6 @@ public Worker build() { leaderDecider = new DeterministicShuffleShardSyncLeaderDecider(leaseManager, Executors.newSingleThreadScheduledExecutor(), PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT); } - return new Worker(config.getApplicationName(), recordProcessorFactory, config, @@ -1546,7 +1559,8 @@ public Worker build() { workerStateChangeListener, shardSyncer, leaderDecider, - null /* PeriodicShardSyncManager */); + null /*PeriodicShardSyncManager*/, + shardConsumerFactory); } > R createClient(final T builder, diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index d19fc3ed0..ab254961f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -406,4 +406,4 @@ private static class CompletedShardResult { boolean cleanedUp; String failureMsg; } -} +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 6a5e76b93..63cceb6e9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -14,8 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ShardConsumerState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -50,7 +50,7 @@ public class ConsumerStatesTest { @Mock - private ShardConsumer consumer; + private KinesisShardConsumer consumer; @Mock private StreamConfig streamConfig; @Mock @@ -251,9 +251,9 @@ public void shutdownRequestState() { equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer))); assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification))); - assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + assertThat(state.successTransition(), equalTo(KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), - equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + equalTo(KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), @@ -266,7 +266,7 @@ public void shutdownRequestState() { @Test public void shutdownRequestCompleteStateTest() { - ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE; + ConsumerState state = KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE; assertThat(state.createTask(consumer), nullValue()); @@ -345,9 +345,9 @@ public void shutdownCompleteStateNullNotificationTest() { verify(shutdownNotification, never()).shutdownComplete(); } - static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, + static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, String propertyName, Matcher matcher) { - return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); + return taskWith(KinesisShutdownTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher shutdownReqTask( diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java index 11e274cb9..2a3b27748 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java @@ -49,7 +49,7 @@ public class GracefulShutdownCoordinatorTest { @Mock private Callable contextCallable; @Mock - private ConcurrentMap shardInfoConsumerMap; + private ConcurrentMap shardInfoConsumerMap; @Test public void testAllShutdownCompletedAlready() throws Exception { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 7a5e7fd25..8936a28e1 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -85,7 +85,7 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType; /** - * Unit tests of {@link ShardConsumer}. + * Unit tests of {@link KinesisShardConsumer}. */ @RunWith(MockitoJUnitRunner.class) public class ShardConsumerTest { @@ -160,8 +160,8 @@ public final void testInitializationStateUponFailure() throws Exception { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -175,19 +175,19 @@ public final void testInitializationStateUponFailure() throws Exception { config, shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); } /** @@ -210,8 +210,8 @@ public final void testInitializationStateUponSubmissionFailure() throws Exceptio callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -226,21 +226,21 @@ public final void testInitializationStateUponSubmissionFailure() throws Exceptio shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class)); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); } @Test @@ -258,8 +258,8 @@ public void testInitializationStateTransitionsToShutdownOnLeaseNotFound() throws callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -273,19 +273,19 @@ public void testInitializationStateTransitionsToShutdownOnLeaseNotFound() throws config, shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); } @@ -300,8 +300,8 @@ public final void testRecordProcessorThrowable() throws Exception { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -324,10 +324,10 @@ public final void testRecordProcessorThrowable() throws Exception { when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(processor, times(0)).initialize(any(InitializationInput.class)); // Throw Error when IRecordProcessor.initialize() is invoked. @@ -335,7 +335,7 @@ public final void testRecordProcessorThrowable() throws Exception { consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); @@ -347,7 +347,7 @@ public final void testRecordProcessorThrowable() throws Exception { assertThat(e.getCause(), instanceOf(ExecutionException.class)); } Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); @@ -355,7 +355,7 @@ public final void testRecordProcessorThrowable() throws Exception { consumer.consumeShard(); // submit InitializeTask again. Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(2)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); verify(processor, times(2)).initialize(any(InitializationInput.class)); // no other calls with different args @@ -363,11 +363,11 @@ public final void testRecordProcessorThrowable() throws Exception { // Checking the status of submitted InitializeTask from above should pass. consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); } /** - * Test method for {@link ShardConsumer#consumeShard()} + * Test method for {@link KinesisShardConsumer#consumeShard()} */ @Test public final void testConsumeShard() throws Exception { @@ -420,8 +420,8 @@ public final void testConsumeShard() throws Exception { any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -440,11 +440,11 @@ public final void testConsumeShard() throws Exception { shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -454,7 +454,7 @@ public final void testConsumeShard() throws Exception { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -469,21 +469,21 @@ public final void testConsumeShard() throws Exception { assertThat(processor.getNotifyShutdownLatch().await(1, TimeUnit.SECONDS), is(true)); Thread.sleep(50); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); verify(shutdownNotification).shutdownNotificationComplete(); assertThat(processor.isShutdownNotificationCalled(), equalTo(true)); consumer.consumeShard(); Thread.sleep(50); - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); consumer.beginShutdown(); Thread.sleep(50L); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.ZOMBIE)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); consumer.consumeShard(); verify(shutdownNotification, atLeastOnce()).shutdownComplete(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); verify(getRecordsCache).shutdown(); @@ -524,8 +524,8 @@ public final void testShardConsumerShutdownWhenBlockedOnParent() throws Exceptio when(recordProcessorCheckpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(streamConfig.getStreamProxy()).thenReturn(streamProxy); - final ShardConsumer consumer = - new ShardConsumer(shardInfo, + final KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -544,21 +544,21 @@ public final void testShardConsumerShutdownWhenBlockedOnParent() throws Exceptio shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(parentLease, times(0)).getCheckpoint(); consumer.consumeShard(); // check on parent shards Thread.sleep(parentShardPollIntervalMillis * 2); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(parentLease, times(1)).getCheckpoint(); consumer.notifyShutdownRequested(shutdownNotification); verify(shutdownNotification, times(0)).shutdownComplete(); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); Thread.sleep(50L); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(consumer.isShutdown(), is(true)); verify(shutdownNotification, times(1)).shutdownComplete(); consumer.beginShutdown(); @@ -583,7 +583,7 @@ public void shutdown(ShutdownInput input) { } /** - * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record + * Test method for {@link KinesisShardConsumer#consumeShard()} that ensures a transient error thrown from the record * processor's shutdown method with reason zombie will be retried. */ @Test @@ -646,8 +646,8 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception metricsFactory ); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -667,11 +667,11 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception shardSyncStrategy); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -681,7 +681,7 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -709,12 +709,12 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception // Wait for shutdown complete now that terminate shutdown is successful for (int i = 0; i < 100; i++) { consumer.consumeShard(); - if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + if (consumer.getCurrentState() == KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { break; } Thread.sleep(50L); } - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); @@ -732,7 +732,7 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception /** - * Test method for {@link ShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown + * Test method for {@link KinesisShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown * reason TERMINATE when the shard end is reached. */ @Test @@ -795,8 +795,8 @@ public final void testConsumeShardWithShardEnd() throws Exception { metricsFactory ); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -818,11 +818,11 @@ public final void testConsumeShardWithShardEnd() throws Exception { when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -832,7 +832,7 @@ public final void testConsumeShardWithShardEnd() throws Exception { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -860,12 +860,12 @@ public final void testConsumeShardWithShardEnd() throws Exception { // Wait for shutdown complete now that terminate shutdown is successful for (int i = 0; i < 100; i++) { consumer.consumeShard(); - if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + if (consumer.getCurrentState() == KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { break; } Thread.sleep(50L); } - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); @@ -881,7 +881,7 @@ public final void testConsumeShardWithShardEnd() throws Exception { } /** - * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. + * Test method for {@link KinesisShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */ @Test public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception { @@ -938,8 +938,8 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -958,11 +958,11 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -973,7 +973,7 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -985,9 +985,9 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except assertThat(processor.getShutdownReason(), nullValue()); consumer.beginShutdown(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); executorService.shutdown(); @@ -1014,8 +1014,8 @@ public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exce callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1041,22 +1041,22 @@ public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exce when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(processor, times(0)).initialize(any(InitializationInput.class)); consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); verify(processor, times(1)).initialize(any(InitializationInput.class)); // no other calls with different args consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); } @Test @@ -1069,8 +1069,8 @@ public void testCreateSynchronousGetRecordsRetrieval() { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer shardConsumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1101,8 +1101,8 @@ public void testCreateAsynchronousGetRecordsRetrieval() { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer shardConsumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1144,7 +1144,7 @@ public void testLongRunningTasks() throws InterruptedException { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = new ShardConsumer( + KinesisShardConsumer shardConsumer = new KinesisShardConsumer( shardInfo, streamConfig, checkpoint, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 053a8bf76..e8870d666 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -66,7 +66,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.RETRY_RANDOM_MAX_RANGE; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShutdownTask.RETRY_RANDOM_MAX_RANGE; /** * @@ -139,7 +139,7 @@ public void tearDown() throws Exception { } /** - * Test method for {@link ShutdownTask#call()}. + * Test method for {@link KinesisShutdownTask#call()}. */ @Test public final void testCallWhenApplicationDoesNotCheckpoint() { @@ -148,7 +148,7 @@ public final void testCallWhenApplicationDoesNotCheckpoint() { when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -171,7 +171,7 @@ public final void testCallWhenApplicationDoesNotCheckpoint() { } /** - * Test method for {@link ShutdownTask#call()}. + * Test method for {@link KinesisShutdownTask#call()}. */ @Test public final void testCallWhenCreatingLeaseThrows() throws Exception { @@ -183,7 +183,7 @@ public final void testCallWhenCreatingLeaseThrows() throws Exception { final String exceptionMessage = "InvalidStateException is thrown."; when(leaseManager.createLeaseIfNotExists(any(KinesisClientLease.class))).thenThrow(new InvalidStateException(exceptionMessage)); - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -226,7 +226,7 @@ public final void testCallWhenParentInfoNotPresentInLease() throws Exception { // Make first 5 attempts with partial parent info in lease table for (int i = 0; i < 5; i++) { - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -252,7 +252,7 @@ public final void testCallWhenParentInfoNotPresentInLease() throws Exception { } // Make next attempt with complete parent info in lease table - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -290,7 +290,7 @@ public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() thr when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null); for (int i = 0; i < 10; i++) { - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -315,7 +315,7 @@ public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() thr verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); } - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -351,7 +351,7 @@ public final void testCallWhenShardEnd() throws Exception { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -385,7 +385,7 @@ public final void testCallWhenShardNotFound() throws Exception { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(shardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(shardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -415,7 +415,7 @@ public final void testCallWhenLeaseLost() throws Exception { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.ZOMBIE, @@ -438,12 +438,12 @@ public final void testCallWhenLeaseLost() throws Exception { } /** - * Test method for {@link ShutdownTask#getTaskType()}. + * Test method for {@link KinesisShutdownTask#getTaskType()}. */ @Test public final void testGetTaskType() { KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ShutdownTask task = new ShutdownTask(null, null, null, null, + KinesisShutdownTask task = new KinesisShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 50a3aa9b5..75d741866 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -186,7 +186,7 @@ public class WorkerTest { @Mock private IRecordProcessor v2RecordProcessor; @Mock - private ShardConsumer shardConsumer; + private IShardConsumer shardConsumer; @Mock private Future taskFuture; @Mock @@ -297,13 +297,13 @@ public final void testCreateOrGetShardConsumer() { KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); + IShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertNotNull(consumer); - ShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory); + IShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertSame(consumer, consumer2); ShardInfo shardInfoWithSameShardIdButDifferentConcurrencyToken = new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumer3 = + IShardConsumer consumer3 = worker.createOrGetShardConsumer(shardInfoWithSameShardIdButDifferentConcurrencyToken, streamletFactory); Assert.assertNotNull(consumer3); Assert.assertNotSame(consumer3, consumer); @@ -419,10 +419,10 @@ public final void testCleanupShardConsumers() { new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo2 = new ShardInfo(anotherDummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory); - ShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken = + IShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory); + IShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken = worker.createOrGetShardConsumer(duplicateOfShardInfo1ButWithAnotherConcurrencyToken, streamletFactory); - ShardConsumer consumerOfShardInfo2 = worker.createOrGetShardConsumer(shardInfo2, streamletFactory); + IShardConsumer consumerOfShardInfo2 = worker.createOrGetShardConsumer(shardInfo2, streamletFactory); Set assignedShards = new HashSet(); assignedShards.add(shardInfo1); @@ -1219,11 +1219,11 @@ public void testShutdownDoesNotBlockOnCompletedLeases() throws Exception { false, shardPrioritization); - final Map shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap(); + final Map shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap(); final ShardInfo completedShardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(completedLease); - final ShardConsumer completedShardConsumer = mock(ShardConsumer.class); + final KinesisShardConsumer completedShardConsumer = mock(KinesisShardConsumer.class); shardInfoShardConsumerMap.put(completedShardInfo, completedShardConsumer); - when(completedShardConsumer.getCurrentState()).thenReturn(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE); + when(completedShardConsumer.getCurrentState()).thenReturn(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE); Callable callable = worker.createWorkerShutdownCallable(); assertThat(worker.hasGracefulShutdownStarted(), equalTo(false)); @@ -1338,11 +1338,11 @@ public List answer(InvocationOnMock invocation) throws Throwable { verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); } @@ -1451,11 +1451,11 @@ public List answer(InvocationOnMock invocation) throws Throwable { verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); @@ -2013,19 +2013,19 @@ public void describeTo(Description description) { @Override protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) { return Condition.matched(item, mismatchDescription) - .and(new Condition.Step() { + .and(new Condition.Step() { @Override - public Condition apply(MetricsCollectingTaskDecorator value, + public Condition apply(MetricsCollectingTaskDecorator value, Description mismatch) { - if (!(value.getOther() instanceof ShutdownTask)) { + if (!(value.getOther() instanceof KinesisShutdownTask)) { mismatch.appendText("Wrapped task isn't a shutdown task"); return Condition.notMatched(); } - return Condition.matched((ShutdownTask) value.getOther(), mismatch); + return Condition.matched((KinesisShutdownTask) value.getOther(), mismatch); } - }).and(new Condition.Step() { + }).and(new Condition.Step() { @Override - public Condition apply(ShutdownTask value, Description mismatch) { + public Condition apply(KinesisShutdownTask value, Description mismatch) { return Condition.matched(value.getReason(), mismatch); } }).matching(matcher);