diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java index 7fb160730..47990226c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java @@ -45,24 +45,24 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie private static final int TIME_TO_KEEP_ALIVE = 5; private static final int CORE_THREAD_POOL_COUNT = 1; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final ExecutorService executorService; private final int retryGetRecordsInSeconds; private final String shardId; final Supplier> completionServiceSupplier; - public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher, + public AsynchronousGetRecordsRetrievalStrategy(@NonNull final IDataFetcher dataFetcher, final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) { this(dataFetcher, buildExector(maxGetRecordsThreadPool, shardId), retryGetRecordsInSeconds, shardId); } - public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher, + public AsynchronousGetRecordsRetrievalStrategy(final IDataFetcher dataFetcher, final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) { this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService), shardId); } - AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, + AsynchronousGetRecordsRetrievalStrategy(IDataFetcher dataFetcher, ExecutorService executorService, int retryGetRecordsInSeconds, Supplier> completionServiceSupplier, String shardId) { this.dataFetcher = dataFetcher; @@ -148,7 +148,7 @@ private static ExecutorService buildExector(int maxGetRecordsThreadPool, String } @Override - public KinesisDataFetcher getDataFetcher() { + public IDataFetcher getDataFetcher() { return dataFetcher; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java index 4f4c49b81..c9f9e6139 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockOnParentShardTask.java @@ -30,7 +30,7 @@ * If we don't find a checkpoint for the parent shard(s), we assume they have been trimmed and directly * proceed with processing data from the shard. */ -class BlockOnParentShardTask implements ITask { +public class BlockOnParentShardTask implements ITask { private static final Log LOG = LogFactory.getLog(BlockOnParentShardTask.class); private final ShardInfo shardInfo; @@ -45,9 +45,9 @@ class BlockOnParentShardTask implements ITask { * @param leaseManager Used to fetch the lease and checkpoint info for parent shards * @param parentShardPollIntervalMillis Sleep time if the parent shard has not completed processing */ - BlockOnParentShardTask(ShardInfo shardInfo, - ILeaseManager leaseManager, - long parentShardPollIntervalMillis) { + public BlockOnParentShardTask(ShardInfo shardInfo, + ILeaseManager leaseManager, + long parentShardPollIntervalMillis) { this.shardInfo = shardInfo; this.leaseManager = leaseManager; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java index 71a153408..d831a4f1c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java @@ -46,9 +46,9 @@ public interface GetRecordsRetrievalStrategy { boolean isShutdown(); /** - * Returns the KinesisDataFetcher used to getRecords from Kinesis. + * Returns the IDataFetcher used to getRecords * - * @return KinesisDataFetcher + * @return IDataFetcher */ - KinesisDataFetcher getDataFetcher(); + IDataFetcher getDataFetcher(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java new file mode 100644 index 000000000..29f088d15 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IDataFetcher.java @@ -0,0 +1,23 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.model.ChildShard; + +import java.util.List; + +public interface IDataFetcher { + + DataFetcherResult getRecords(int maxRecords); + + void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream); + + void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream); + + void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream); + + void restartIterator(); + + boolean isShardEndReached(); + + List getChildShards(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IPeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IPeriodicShardSyncManager.java new file mode 100644 index 000000000..15bc007af --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IPeriodicShardSyncManager.java @@ -0,0 +1,28 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Value; +import lombok.experimental.Accessors; + +public interface IPeriodicShardSyncManager { + + TaskResult start(); + + /** + * Runs ShardSync once, without scheduling further periodic ShardSyncs. + * @return TaskResult from shard sync + */ + TaskResult syncShardsOnce(); + + void stop(); + + @Value + @Accessors(fluent = true) + @VisibleForTesting + class ShardSyncResponse { + private final boolean shouldDoShardSync; + private final boolean isHoleDetected; + private final String reasonForDecision; + } +} + diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java new file mode 100644 index 000000000..0cacd57a2 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumer.java @@ -0,0 +1,24 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +public interface IShardConsumer { + + boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist(); + + enum TaskOutcome { + SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND + } + + boolean consumeShard(); + + boolean isShutdown(); + + ShutdownReason getShutdownReason(); + + boolean beginShutdown(); + + void notifyShutdownRequested(ShutdownNotification shutdownNotification); + + KinesisConsumerStates.ShardConsumerState getCurrentState(); + + boolean isShutdownRequested(); +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java new file mode 100644 index 000000000..acb23018e --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/IShardConsumerFactory.java @@ -0,0 +1,34 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +public interface IShardConsumerFactory { + + /** + * Returns a shard consumer to be used for consuming a (assigned) shard. + * + * @return Returns a shard consumer object. + */ + IShardConsumer createShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpointTracker, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesUponShardCompletion, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long taskBackoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager); +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ITask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ITask.java index fc2000a01..10c02b6ef 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ITask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ITask.java @@ -20,7 +20,7 @@ * Interface for shard processing tasks. * A task may execute an application callback (e.g. initialize, process, shutdown). */ -interface ITask extends Callable { +public interface ITask extends Callable { /** * Perform task logic. diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java index 5343470f8..7ee4ab27e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/InitializeTask.java @@ -29,7 +29,7 @@ /** * Task for initializing shard position and invoking the RecordProcessor initialize() API. */ -class InitializeTask implements ITask { +public class InitializeTask implements ITask { private static final Log LOG = LogFactory.getLog(InitializeTask.class); @@ -37,7 +37,7 @@ class InitializeTask implements ITask { private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final TaskType taskType = TaskType.INITIALIZE; private final ICheckpoint checkpoint; private final RecordProcessorCheckpointer recordProcessorCheckpointer; @@ -49,14 +49,14 @@ class InitializeTask implements ITask { /** * Constructor. */ - InitializeTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - ICheckpoint checkpoint, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisDataFetcher dataFetcher, - long backoffTimeMillis, - StreamConfig streamConfig, - GetRecordsCache getRecordsCache) { + public InitializeTask(ShardInfo shardInfo, + IRecordProcessor recordProcessor, + ICheckpoint checkpoint, + RecordProcessorCheckpointer recordProcessorCheckpointer, + IDataFetcher dataFetcher, + long backoffTimeMillis, + StreamConfig streamConfig, + GetRecordsCache getRecordsCache) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.checkpoint = checkpoint; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index ccde83f35..104412346 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -20,6 +20,7 @@ import java.util.Set; import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import org.apache.commons.lang3.Validate; import com.amazonaws.ClientConfiguration; @@ -61,7 +62,7 @@ public class KinesisClientLibConfiguration { public static final int DEFAULT_MAX_RECORDS = 10000; /** - * The default value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to + * The default value for how long the {@link KinesisShardConsumer} should sleep if no records are returned from the call to * {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}. */ public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L; @@ -627,7 +628,7 @@ public KinesisClientLibConfiguration(String applicationName, * @param billingMode The DDB Billing mode to set for lease table creation. * @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard. * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in - * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * {@link LeaseCleanupManager} * @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases * (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up. * @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases @@ -926,7 +927,7 @@ public boolean shouldCleanupLeasesUponShardCompletion() { } /** - * @return Interval in millis at which to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * @return Interval in millis at which to run lease cleanup thread in {@link LeaseCleanupManager} */ public long leaseCleanupIntervalMillis() { return leaseCleanupIntervalMillis; @@ -1030,7 +1031,7 @@ public int getInitialLeaseTableWriteCapacity() { * Keeping it protected to forbid outside callers from depending on this internal object. * @return The initialPositionInStreamExtended object. */ - protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() { + public InitialPositionInStreamExtended getInitialPositionInStreamExtended() { return initialPositionInStreamExtended; } @@ -1623,7 +1624,7 @@ public KinesisClientLibConfiguration withMaxInitializationAttempts(int maxInitia /** * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in - * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * {@link LeaseCleanupManager} * @return */ public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java index 47baad044..ad55ab523 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibLeaseCoordinator.java @@ -51,7 +51,7 @@ /** * This class is used to coordinate/manage leases owned by this worker process and to get/set checkpoints. */ -class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { +public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator implements ICheckpoint { private static final Log LOG = LogFactory.getLog(KinesisClientLibLeaseCoordinator.class); @@ -368,7 +368,7 @@ void runLeaseRenewer() throws DependencyException, InvalidStateException { * * @return LeaseManager */ - ILeaseManager getLeaseManager() { + public ILeaseManager getLeaseManager() { return leaseManager; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java similarity index 90% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java index 5cf55dbfe..766d6fbae 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisConsumerStates.java @@ -15,7 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; /** - * Top level container for all the possible states a {@link ShardConsumer} can be in. The logic for creation of tasks, + * Top level container for all the possible states a {@link KinesisShardConsumer} can be in. The logic for creation of tasks, * and state transitions is contained within the {@link ConsumerState} objects. * *

State Diagram

@@ -64,12 +64,12 @@ * +-------------------+ * */ -class ConsumerStates { +public class KinesisConsumerStates { /** * Enumerates processing states when working on a shard. */ - enum ShardConsumerState { + public enum ShardConsumerState { // @formatter:off WAITING_ON_PARENT_SHARDS(new BlockedOnParentState()), INITIALIZING(new InitializingState()), @@ -96,7 +96,7 @@ public ConsumerState getConsumerState() { * do when a transition occurs. * */ - interface ConsumerState { + public interface ConsumerState { /** * Creates a new task for this state using the passed in consumer to build the task. If there is no task * required for this state it may return a null value. {@link ConsumerState}'s are allowed to modify the @@ -106,11 +106,11 @@ interface ConsumerState { * the consumer to use build the task, or execute state. * @return a valid task for this state or null if there is no task required. */ - ITask createTask(ShardConsumer consumer); + ITask createTask(KinesisShardConsumer consumer); /** * Provides the next state of the consumer upon success of the task return by - * {@link ConsumerState#createTask(ShardConsumer)}. + * {@link ConsumerState#createTask(KinesisShardConsumer)}. * * @return the next state that the consumer should transition to, this may be the same object as the current * state. @@ -129,7 +129,7 @@ interface ConsumerState { ConsumerState shutdownTransition(ShutdownReason shutdownReason); /** - * The type of task that {@link ConsumerState#createTask(ShardConsumer)} would return. This is always a valid state + * The type of task that {@link ConsumerState#createTask(KinesisShardConsumer)} would return. This is always a valid state * even if createTask would return a null value. * * @return the type of task that this state represents. @@ -149,7 +149,7 @@ interface ConsumerState { } /** - * The initial state that any {@link ShardConsumer} should start in. + * The initial state that any {@link KinesisShardConsumer} should start in. */ static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.getConsumerState(); @@ -187,7 +187,7 @@ private static ConsumerState shutdownStateFor(ShutdownReason reason) { static class BlockedOnParentState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return new BlockOnParentShardTask(consumer.getShardInfo(), consumer.getLeaseManager(), consumer.getParentShardPollIntervalMillis()); } @@ -247,10 +247,10 @@ public boolean isTerminal() { * * */ - static class InitializingState implements ConsumerState { + public static class InitializingState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return new InitializeTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getCheckpoint(), @@ -311,7 +311,7 @@ public boolean isTerminal() { static class ProcessingState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), @@ -358,10 +358,10 @@ public boolean isTerminal() { *

Valid Transitions

*
*
Success
- *
Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown.
+ *
Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown.
*
Shutdown
*
At this point records are being retrieved, and processed. An explicit shutdown will allow the record - * processor one last chance to checkpoint, and then the {@link ShardConsumer} will be held in an idle state. + * processor one last chance to checkpoint, and then the {@link KinesisShardConsumer} will be held in an idle state. *
*
{@link ShutdownReason#REQUESTED}
*
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, but the state implementation changes to @@ -377,7 +377,7 @@ public boolean isTerminal() { static class ShutdownNotificationState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return new ShutdownNotificationTask(consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), consumer.getShutdownNotification(), @@ -414,24 +414,24 @@ public boolean isTerminal() { } /** - * Once the {@link ShutdownNotificationState} has been completed the {@link ShardConsumer} must not re-enter any of the - * processing states. This state idles the {@link ShardConsumer} until the worker triggers the final shutdown state. + * Once the {@link ShutdownNotificationState} has been completed the {@link KinesisShardConsumer} must not re-enter any of the + * processing states. This state idles the {@link KinesisShardConsumer} until the worker triggers the final shutdown state. * *

Valid Transitions

*
*
Success
*
*

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. *

*

* Remains in the {@link ShutdownNotificationCompletionState} *

*
*
Shutdown
- *
At this point the {@link ShardConsumer} has notified the record processor of the impending shutdown, and is + *
At this point the {@link KinesisShardConsumer} has notified the record processor of the impending shutdown, and is * waiting that notification. While waiting for the notification no further processing should occur on the - * {@link ShardConsumer}. + * {@link KinesisShardConsumer}. *
*
{@link ShutdownReason#REQUESTED}
*
Remains in the {@link ShardConsumerState#SHUTDOWN_REQUESTED}, and the state implementation remains @@ -447,7 +447,7 @@ public boolean isTerminal() { static class ShutdownNotificationCompletionState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { return null; } @@ -481,14 +481,14 @@ public boolean isTerminal() { } /** - * This state is entered if the {@link ShardConsumer} loses its lease, or reaches the end of the shard. + * This state is entered if the {@link KinesisShardConsumer} loses its lease, or reaches the end of the shard. * *

Valid Transitions

*
*
Success
*
*

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. *

*

* Transitions to the {@link ShutdownCompleteState} @@ -497,7 +497,7 @@ public boolean isTerminal() { *

Shutdown
*
At this point the record processor has processed the final shutdown indication, and depending on the shutdown * reason taken the correct course of action. From this point on there should be no more interactions with the - * record processor or {@link ShardConsumer}. + * record processor or {@link KinesisShardConsumer}. *
*
{@link ShutdownReason#REQUESTED}
*
@@ -519,8 +519,8 @@ public boolean isTerminal() { static class ShuttingDownState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { - return new ShutdownTask(consumer.getShardInfo(), + public ITask createTask(KinesisShardConsumer consumer) { + return new KinesisShutdownTask(consumer.getShardInfo(), consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), consumer.getShutdownReason(), @@ -562,21 +562,21 @@ public boolean isTerminal() { } /** - * This is the final state for the {@link ShardConsumer}. This occurs once all shutdown activities are completed. + * This is the final state for the {@link KinesisShardConsumer}. This occurs once all shutdown activities are completed. * *

Valid Transitions

*
*
Success
*
*

- * Success shouldn't normally be called since the {@link ShardConsumer} is marked for shutdown. + * Success shouldn't normally be called since the {@link KinesisShardConsumer} is marked for shutdown. *

*

* Remains in the {@link ShutdownCompleteState} *

*
*
Shutdown
- *
At this point the all shutdown activites are completed, and the {@link ShardConsumer} should not take any + *
At this point the all shutdown activites are completed, and the {@link KinesisShardConsumer} should not take any * further actions. *
*
{@link ShutdownReason#REQUESTED}
@@ -599,7 +599,7 @@ public boolean isTerminal() { static class ShutdownCompleteState implements ConsumerState { @Override - public ITask createTask(ShardConsumer consumer) { + public ITask createTask(KinesisShardConsumer consumer) { if (consumer.getShutdownNotification() != null) { consumer.getShutdownNotification().shutdownComplete(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index ae4e321d4..4c8d638b9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -39,7 +39,7 @@ /** * Used to get data from Amazon Kinesis. Tracks iterator state internally. */ -class KinesisDataFetcher { +public class KinesisDataFetcher implements IDataFetcher{ private static final Log LOG = LogFactory.getLog(KinesisDataFetcher.class); @@ -185,7 +185,7 @@ public void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPosition * @param sequenceNumber advance the iterator to the record at this sequence number. * @param initialPositionInStream The initialPositionInStream. */ - void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { + public void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream) { if (sequenceNumber == null) { throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId); } else if (sequenceNumber.equals(SentinelCheckpoint.LATEST.toString())) { @@ -276,11 +276,11 @@ public void restartIterator() { /** * @return the shardEndReached */ - protected boolean isShardEndReached() { + public boolean isShardEndReached() { return isShardEndReached; } - protected List getChildShards() { + public List getChildShards() { return childShards; } @@ -290,5 +290,4 @@ protected List getChildShards() { String getNextIterator() { return nextIterator; } - } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisPeriodicShardSyncManager.java similarity index 92% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisPeriodicShardSyncManager.java index cdf73e824..991272917 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisPeriodicShardSyncManager.java @@ -46,7 +46,6 @@ import lombok.Getter; import lombok.NonNull; import lombok.Value; -import lombok.experimental.Accessors; import org.apache.commons.lang3.Validate; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,11 +60,12 @@ */ @Getter @EqualsAndHashCode -class PeriodicShardSyncManager { - private static final Log LOG = LogFactory.getLog(PeriodicShardSyncManager.class); +class KinesisPeriodicShardSyncManager implements IPeriodicShardSyncManager{ + private static final Log LOG = LogFactory.getLog(KinesisPeriodicShardSyncManager.class); private static final long INITIAL_DELAY = 0; /** DEFAULT interval is used for PERIODIC {@link ShardSyncStrategyType}. */ + private static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 1000L; /** Parameters for validating hash range completeness when running in auditor mode. */ @@ -89,30 +89,30 @@ class PeriodicShardSyncManager { private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold; - PeriodicShardSyncManager(String workerId, - LeaderDecider leaderDecider, - ShardSyncTask shardSyncTask, - IMetricsFactory metricsFactory, - ILeaseManager leaseManager, - IKinesisProxy kinesisProxy, - boolean isAuditorMode, - long leasesRecoveryAuditorExecutionFrequencyMillis, - int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { + KinesisPeriodicShardSyncManager(String workerId, + LeaderDecider leaderDecider, + ShardSyncTask shardSyncTask, + IMetricsFactory metricsFactory, + ILeaseManager leaseManager, + IKinesisProxy kinesisProxy, + boolean isAuditorMode, + long leasesRecoveryAuditorExecutionFrequencyMillis, + int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { this(workerId, leaderDecider, shardSyncTask, Executors.newSingleThreadScheduledExecutor(), metricsFactory, leaseManager, kinesisProxy, isAuditorMode, leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold); } - PeriodicShardSyncManager(String workerId, - LeaderDecider leaderDecider, - ShardSyncTask shardSyncTask, - ScheduledExecutorService shardSyncThreadPool, - IMetricsFactory metricsFactory, - ILeaseManager leaseManager, - IKinesisProxy kinesisProxy, - boolean isAuditorMode, - long leasesRecoveryAuditorExecutionFrequencyMillis, - int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { + KinesisPeriodicShardSyncManager(String workerId, + LeaderDecider leaderDecider, + ShardSyncTask shardSyncTask, + ScheduledExecutorService shardSyncThreadPool, + IMetricsFactory metricsFactory, + ILeaseManager leaseManager, + IKinesisProxy kinesisProxy, + boolean isAuditorMode, + long leasesRecoveryAuditorExecutionFrequencyMillis, + int leasesRecoveryAuditorInconsistencyConfidenceThreshold) { Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager."); Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager."); Validate.notNull(shardSyncTask, "ShardSyncTask is required to initialize PeriodicShardSyncManager."); @@ -134,6 +134,7 @@ class PeriodicShardSyncManager { } } + @Override public synchronized TaskResult start() { if (!isRunning) { final Runnable periodicShardSyncer = () -> { @@ -156,11 +157,13 @@ public synchronized TaskResult start() { * Runs ShardSync once, without scheduling further periodic ShardSyncs. * @return TaskResult from shard sync */ + @Override public synchronized TaskResult syncShardsOnce() { LOG.info("Syncing shards once from worker " + workerId); return metricsEmittingShardSyncTask.call(); } + @Override public void stop() { if (isRunning) { LOG.info(String.format("Shutting down leader decider on worker %s", workerId)); @@ -344,16 +347,6 @@ static List sortLeasesByHashRange(List l Collections.sort(leasesWithHashKeyRanges, new HashKeyRangeComparator()); return leasesWithHashKeyRanges; } - - @Value - @Accessors(fluent = true) - @VisibleForTesting - static class ShardSyncResponse { - private final boolean shouldDoShardSync; - private final boolean isHoleDetected; - private final String reasonForDecision; - } - @Value private static class HashRangeHole { private final HashKeyRangeForLease hashRangeAtStartOfPossibleHole; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java similarity index 81% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java index 11ee39b5d..07a2f9369 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumer.java @@ -36,16 +36,14 @@ import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.google.common.annotations.VisibleForTesting; -import lombok.Getter; - /** * Responsible for consuming data records of a (specified) shard. * The instance should be shutdown when we lose the primary responsibility for a shard. * A new instance should be created if the primary responsibility is reassigned back to this process. */ -class ShardConsumer { +public class KinesisShardConsumer implements IShardConsumer{ - private static final Log LOG = LogFactory.getLog(ShardConsumer.class); + private static final Log LOG = LogFactory.getLog(KinesisShardConsumer.class); private final StreamConfig streamConfig; private final IRecordProcessor recordProcessor; @@ -64,7 +62,7 @@ class ShardConsumer { private final long taskBackoffTimeMillis; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; - @Getter + //@Getter private final ShardSyncer shardSyncer; private ITask currentTask; @@ -72,16 +70,28 @@ class ShardConsumer { private Future future; private ShardSyncStrategy shardSyncStrategy; - @Getter + //@Getter private List childShards; - @Getter + //@Getter private final GetRecordsCache getRecordsCache; - private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - ShardInfo shardInfo) { + public List getChildShards() { + return childShards; + } + + public GetRecordsCache getGetRecordsCache() { + return getRecordsCache; + } + + public ShardSyncer getShardSyncer() { + return shardSyncer; + } + + private static final GetRecordsRetrievalStrategy makeStrategy(IDataFetcher dataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + ShardInfo shardInfo) { Optional getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry -> maxGetRecordsThreadPool.map(max -> new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId()))); @@ -93,7 +103,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher * Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do * much coordination/synchronization to handle concurrent reads/updates. */ - private ConsumerStates.ConsumerState currentState = ConsumerStates.INITIAL_STATE; + private KinesisConsumerStates.ConsumerState currentState = KinesisConsumerStates.INITIAL_STATE; /* * Used to track if we lost the primary responsibility. Once set to true, we will start shutting down. * If we regain primary responsibility before shutdown is complete, Worker should create a new ShardConsumer object. @@ -116,18 +126,18 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @Deprecated - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this(shardInfo, streamConfig, @@ -162,20 +172,20 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES @Deprecated - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this( shardInfo, streamConfig, @@ -223,22 +233,22 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher * @param shardSyncer shardSyncer instance used to check and create new leases */ @Deprecated - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisDataFetcher kinesisDataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, @@ -269,23 +279,23 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher * @param shardSyncer shardSyncer instance used to check and create new leases * @param leaseCleanupManager used to clean up leases in lease table. */ - ShardConsumer(ShardInfo shardInfo, - StreamConfig streamConfig, - ICheckpoint checkpoint, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long parentShardPollIntervalMillis, - boolean cleanupLeasesOfCompletedShards, - ExecutorService executorService, - IMetricsFactory metricsFactory, - long backoffTimeMillis, - boolean skipShardSyncAtWorkerInitializationIfLeasesExist, - KinesisDataFetcher kinesisDataFetcher, - Optional retryGetRecordsInSeconds, - Optional maxGetRecordsThreadPool, - KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, - LeaseCleanupManager leaseCleanupManager) { + KinesisShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.streamConfig = streamConfig; this.checkpoint = checkpoint; @@ -314,7 +324,7 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher * * @return true if a new process task was submitted, false otherwise */ - synchronized boolean consumeShard() { + public synchronized boolean consumeShard() { return checkAndSubmitNextTask(); } @@ -373,9 +383,9 @@ public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() { return skipShardSyncAtWorkerInitializationIfLeasesExist; } - private enum TaskOutcome { + /*public enum TaskOutcome { SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND - } + }*/ private TaskOutcome determineTaskOutcome() { try { @@ -423,7 +433,7 @@ private void logTaskException(TaskResult taskResult) { * * @param shutdownNotification used to signal that the record processor has been given the chance to shutdown. */ - void notifyShutdownRequested(ShutdownNotification shutdownNotification) { + public void notifyShutdownRequested(ShutdownNotification shutdownNotification) { this.shutdownNotification = shutdownNotification; markForShutdown(ShutdownReason.REQUESTED); } @@ -434,7 +444,7 @@ void notifyShutdownRequested(ShutdownNotification shutdownNotification) { * * @return true if shutdown is complete (false if shutdown is still in progress) */ - synchronized boolean beginShutdown() { + public synchronized boolean beginShutdown() { markForShutdown(ShutdownReason.ZOMBIE); checkAndSubmitNextTask(); @@ -454,14 +464,14 @@ synchronized void markForShutdown(ShutdownReason reason) { * * @return true if shutdown is complete */ - boolean isShutdown() { + public boolean isShutdown() { return currentState.isTerminal(); } /** * @return the shutdownReason */ - ShutdownReason getShutdownReason() { + public ShutdownReason getShutdownReason() { return shutdownReason; } @@ -497,7 +507,7 @@ void updateState(TaskOutcome taskOutcome) { } if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); - } else if (isShutdownRequested() && ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { + } else if (isShutdownRequested() && KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS.equals(currentState.getState())) { currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { @@ -516,7 +526,7 @@ void updateState(TaskOutcome taskOutcome) { } @VisibleForTesting - boolean isShutdownRequested() { + public boolean isShutdownRequested() { return shutdownReason != null; } @@ -525,7 +535,7 @@ boolean isShutdownRequested() { * * @return the currentState */ - ConsumerStates.ShardConsumerState getCurrentState() { + public KinesisConsumerStates.ShardConsumerState getCurrentState() { return currentState.getState(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java new file mode 100644 index 000000000..bbcae852c --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardConsumerFactory.java @@ -0,0 +1,48 @@ +package com.amazonaws.services.kinesis.clientlibrary.lib.worker; + +import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +public class KinesisShardConsumerFactory implements IShardConsumerFactory{ + + @Override + public IShardConsumer createShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpointTracker, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesUponShardCompletion, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long taskBackoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager) { + return new KinesisShardConsumer(shardInfo, + streamConfig, + checkpointTracker, + recordProcessor, + recordProcessorCheckpointer, + leaseCoordinator, + parentShardPollIntervalMillis, + cleanupLeasesUponShardCompletion, + executorService, + metricsFactory, + taskBackoffTimeMillis, + skipShardSyncAtWorkerInitializationIfLeasesExist, + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), + retryGetRecordsInSeconds, + maxGetRecordsThreadPool, + config, shardSyncer, shardSyncStrategy, + leaseCleanupManager); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java similarity index 94% rename from src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java rename to src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java index 07c9fff23..e71991722 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShutdownTask.java @@ -44,9 +44,9 @@ /** * Task for invoking the RecordProcessor shutdown() callback. */ -class ShutdownTask implements ITask { +public class KinesisShutdownTask implements ITask { - private static final Log LOG = LogFactory.getLog(ShutdownTask.class); + private static final Log LOG = LogFactory.getLog(KinesisShutdownTask.class); @VisibleForTesting static final int RETRY_RANDOM_MAX_RANGE = 50; @@ -72,19 +72,19 @@ class ShutdownTask implements ITask { * Constructor. */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES - ShutdownTask(ShardInfo shardInfo, - IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, - ShutdownReason reason, - IKinesisProxy kinesisProxy, - InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards, - KinesisClientLibLeaseCoordinator leaseCoordinator, - long backoffTimeMillis, - GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, - ShardSyncStrategy shardSyncStrategy, List childShards, - LeaseCleanupManager leaseCleanupManager) { + KinesisShutdownTask(ShardInfo shardInfo, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + ShutdownReason reason, + IKinesisProxy kinesisProxy, + InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesOfCompletedShards, + boolean ignoreUnexpectedChildShards, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long backoffTimeMillis, + GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, + ShardSyncStrategy shardSyncStrategy, List childShards, + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java index 4f7703130..b7d1b0165 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/MetricsCollectingTaskDecorator.java @@ -21,7 +21,7 @@ /** * Decorates an ITask and reports metrics about its timing and success/failure. */ -class MetricsCollectingTaskDecorator implements ITask { +public class MetricsCollectingTaskDecorator implements ITask { private final ITask other; private final IMetricsFactory factory; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java index c85fbbef0..dce4bb028 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncStrategy.java @@ -6,9 +6,9 @@ */ class PeriodicShardSyncStrategy implements ShardSyncStrategy { - private PeriodicShardSyncManager periodicShardSyncManager; + private IPeriodicShardSyncManager periodicShardSyncManager; - PeriodicShardSyncStrategy(PeriodicShardSyncManager periodicShardSyncManager) { + PeriodicShardSyncStrategy(IPeriodicShardSyncManager periodicShardSyncManager) { this.periodicShardSyncManager = periodicShardSyncManager; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index a4cf74d88..3e4dbcb76 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -60,7 +60,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private PrefetchCounters prefetchCounters; private boolean started = false; private final String operation; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final String shardId; /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index cd543e23d..615874501 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -41,7 +41,7 @@ /** * Task for fetching data records and invoking processRecords() on the record processor instance. */ -class ProcessTask implements ITask { +public class ProcessTask implements ITask { private static final Log LOG = LogFactory.getLog(ProcessTask.class); @@ -55,7 +55,7 @@ class ProcessTask implements ITask { private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; private final RecordProcessorCheckpointer recordProcessorCheckpointer; - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; private final TaskType taskType = TaskType.PROCESS; private final StreamConfig streamConfig; private final long backoffTimeMillis; @@ -81,7 +81,7 @@ class ProcessTask implements ITask { * The retrieval strategy for fetching records from kinesis */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + RecordProcessorCheckpointer recordProcessorCheckpointer, IDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, GetRecordsCache getRecordsCache) { this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis, @@ -107,7 +107,7 @@ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProces * determines how throttling events should be reported in the log. */ public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor, - RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, + RecordProcessorCheckpointer recordProcessorCheckpointer, IDataFetcher dataFetcher, long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ThrottlingReporter throttlingReporter, GetRecordsCache getRecordsCache) { super(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java index dbee92189..16e2b317b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordProcessorCheckpointer.java @@ -37,7 +37,7 @@ * The Amazon Kinesis Client Library will instantiate an object and provide a reference to the application * RecordProcessor instance. Amazon Kinesis Client Library will create one instance per shard assignment. */ -class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { +public class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { private static final Log LOG = LogFactory.getLog(RecordProcessorCheckpointer.class); @@ -62,10 +62,10 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer { * @param checkpoint Used to checkpoint progress of a RecordProcessor * @param validator Used for validating sequence numbers */ - RecordProcessorCheckpointer(ShardInfo shardInfo, - ICheckpoint checkpoint, - SequenceNumberValidator validator, - IMetricsFactory metricsFactory) { + public RecordProcessorCheckpointer(ShardInfo shardInfo, + ICheckpoint checkpoint, + SequenceNumberValidator validator, + IMetricsFactory metricsFactory) { this.shardInfo = shardInfo; this.checkpoint = checkpoint; this.sequenceNumberValidator = validator; @@ -231,7 +231,7 @@ public synchronized IPreparedCheckpointer prepareCheckpoint(String sequenceNumbe /** * @return the lastCheckpointValue */ - ExtendedSequenceNumber getLastCheckpointValue() { + public ExtendedSequenceNumber getLastCheckpointValue() { return lastCheckpointValue; } @@ -244,14 +244,14 @@ synchronized void setInitialCheckpointValue(ExtendedSequenceNumber initialCheckp * * @return the largest permitted checkpoint */ - synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { + public synchronized ExtendedSequenceNumber getLargestPermittedCheckpointValue() { return largestPermittedCheckpointValue; } /** * @param largestPermittedCheckpointValue the largest permitted checkpoint */ - synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { + public synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber largestPermittedCheckpointValue) { this.largestPermittedCheckpointValue = largestPermittedCheckpointValue; } @@ -262,7 +262,7 @@ synchronized void setLargestPermittedCheckpointValue(ExtendedSequenceNumber larg * * @param extendedSequenceNumber */ - synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { + public synchronized void setSequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) { this.sequenceNumberAtShardEnd = extendedSequenceNumber; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java index 3ca28235c..ac81dc8b2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SequenceNumberValidator.java @@ -51,7 +51,7 @@ public class SequenceNumberValidator { * @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers * being validated */ - SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { + public SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) { this.proxy = proxy; this.shardId = shardId; this.validateWithGetIterator = validateWithGetIterator; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java index 9efe2f510..d9b0810b3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardEndShardSyncStrategy.java @@ -17,10 +17,10 @@ class ShardEndShardSyncStrategy implements ShardSyncStrategy { private ShardSyncTaskManager shardSyncTaskManager; /** Runs periodic shard sync jobs in the background as an auditor process for shard-end syncs. */ - private PeriodicShardSyncManager periodicShardSyncManager; + private IPeriodicShardSyncManager periodicShardSyncManager; ShardEndShardSyncStrategy(ShardSyncTaskManager shardSyncTaskManager, - PeriodicShardSyncManager periodicShardSyncManager) { + IPeriodicShardSyncManager periodicShardSyncManager) { this.shardSyncTaskManager = shardSyncTaskManager; this.periodicShardSyncManager = periodicShardSyncManager; } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java index 13c43b0e7..8c3dcd0ce 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncTask.java @@ -30,7 +30,7 @@ * It will clean up leases/activities for shards that have been completely processed (if * cleanupLeasesUponShardCompletion is true). */ -class ShardSyncTask implements ITask { +public class ShardSyncTask implements ITask { private static final Log LOG = LogFactory.getLog(ShardSyncTask.class); @@ -56,13 +56,13 @@ class ShardSyncTask implements ITask { * @param shardSyncer shardSyncer instance used to check and create new leases * @param latestShards latest snapshot of shards to reuse */ - ShardSyncTask(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager, - InitialPositionInStreamExtended initialPositionInStream, - boolean cleanupLeasesUponShardCompletion, - boolean ignoreUnexpectedChildShards, - long shardSyncTaskIdleTimeMillis, - ShardSyncer shardSyncer, List latestShards) { + public ShardSyncTask(IKinesisProxy kinesisProxy, + ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPositionInStream, + boolean cleanupLeasesUponShardCompletion, + boolean ignoreUnexpectedChildShards, + long shardSyncTaskIdleTimeMillis, + ShardSyncer shardSyncer, List latestShards) { this.latestShards = latestShards; this.kinesisProxy = kinesisProxy; this.leaseManager = leaseManager; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java index ecf5041f9..98e4702e5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownNotificationTask.java @@ -21,14 +21,14 @@ /** * Notifies record processor of incoming shutdown request, and gives them a chance to checkpoint. */ -class ShutdownNotificationTask implements ITask { +public class ShutdownNotificationTask implements ITask { private final IRecordProcessor recordProcessor; private final IRecordProcessorCheckpointer recordProcessorCheckpointer; private final ShutdownNotification shutdownNotification; private final ShardInfo shardInfo; - ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) { + public ShutdownNotificationTask(IRecordProcessor recordProcessor, IRecordProcessorCheckpointer recordProcessorCheckpointer, ShutdownNotification shutdownNotification, ShardInfo shardInfo) { this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; this.shutdownNotification = shutdownNotification; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java index 5e29d6ddb..c326e361d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownReason.java @@ -15,8 +15,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ShardConsumerState; /** @@ -72,7 +72,7 @@ public boolean canTransitionTo(ShutdownReason reason) { return reason.rank > this.rank; } - ConsumerState getShutdownState() { + public ConsumerState getShutdownState() { return shutdownState; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java index fff6b71f6..547f1d12c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/StreamConfig.java @@ -19,7 +19,7 @@ /** * Used to capture stream configuration and pass it along. */ -class StreamConfig { +public class StreamConfig { private final IKinesisProxy streamProxy; private final int maxRecords; @@ -38,11 +38,11 @@ class StreamConfig { * @param initialPositionInStream Initial position in stream */ StreamConfig(IKinesisProxy proxy, - int maxRecords, - long idleTimeInMilliseconds, - boolean callProcessRecordsEvenForEmptyRecordList, - boolean validateSequenceNumberBeforeCheckpointing, - InitialPositionInStreamExtended initialPositionInStream) { + int maxRecords, + long idleTimeInMilliseconds, + boolean callProcessRecordsEvenForEmptyRecordList, + boolean validateSequenceNumberBeforeCheckpointing, + InitialPositionInStreamExtended initialPositionInStream) { this.streamProxy = proxy; this.maxRecords = maxRecords; this.idleTimeInMilliseconds = idleTimeInMilliseconds; @@ -54,7 +54,7 @@ class StreamConfig { /** * @return the streamProxy */ - IKinesisProxy getStreamProxy() { + public IKinesisProxy getStreamProxy() { return streamProxy; } @@ -82,14 +82,14 @@ boolean shouldCallProcessRecordsEvenForEmptyRecordList() { /** * @return the initialPositionInStream */ - InitialPositionInStreamExtended getInitialPositionInStream() { + public InitialPositionInStreamExtended getInitialPositionInStream() { return initialPositionInStream; } /** * @return validateSequenceNumberBeforeCheckpointing */ - boolean shouldValidateSequenceNumberBeforeCheckpointing() { + public boolean shouldValidateSequenceNumberBeforeCheckpointing() { return validateSequenceNumberBeforeCheckpointing; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java index 5da33ac69..ac46be339 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java @@ -24,7 +24,7 @@ @Data public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrievalStrategy { @NonNull - private final KinesisDataFetcher dataFetcher; + private final IDataFetcher dataFetcher; @Override public GetRecordsResult getRecords(final int maxRecords) { @@ -44,7 +44,7 @@ public boolean isShutdown() { } @Override - public KinesisDataFetcher getDataFetcher() { + public IDataFetcher getDataFetcher() { return dataFetcher; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java index db22c97b1..502d10f09 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/TaskResult.java @@ -22,7 +22,7 @@ * Used to capture information from a task that we want to communicate back to the higher layer. * E.g. exception thrown when executing the task, if we reach end of a shard. */ -class TaskResult { +public class TaskResult { // Did we reach the end of the shard while processing this task. private boolean shardEndReached; @@ -38,7 +38,7 @@ class TaskResult { /** * @return the shardEndReached */ - protected boolean isShardEndReached() { + public boolean isShardEndReached() { return shardEndReached; } @@ -77,7 +77,7 @@ public Exception getException() { /** * @param e Any exception encountered when running the process task. */ - TaskResult(Exception e) { + public TaskResult(Exception e) { this(e, false); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index a69ea6ca1..4ec858d85 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -137,7 +137,7 @@ public class Worker implements Runnable { // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. - private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); + private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; @@ -156,10 +156,13 @@ public class Worker implements Runnable { // Periodic Shard Sync related fields private LeaderDecider leaderDecider; private ShardSyncStrategy shardSyncStrategy; - private PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; + private IPeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; private final LeaseCleanupManager leaseCleanupManager; + // Shard Consumer Factory + private IShardConsumerFactory shardConsumerFactory; + /** * Constructor. * @@ -533,13 +536,13 @@ config, getStreamConfig(config, kinesisClient), IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, - LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, PeriodicShardSyncManager periodicShardSyncManager) { + LeaseCleanupValidator leaseCleanupValidator, LeaderDecider leaderDecider, IPeriodicShardSyncManager periodicShardSyncManager) { this(applicationName, recordProcessorFactory, config, streamConfig, initialPositionInStream, parentShardPollIntervalMillis, shardSyncIdleTimeMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, shardPrioritization, retryGetRecordsInSeconds, maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(leaseCleanupValidator), - leaderDecider, periodicShardSyncManager); + leaderDecider, periodicShardSyncManager, null /*ShardConsumerFactory*/); } Worker(String applicationName, IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, @@ -550,7 +553,7 @@ maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(lease boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization, Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, WorkerStateChangeListener workerStateChangeListener, ShardSyncer shardSyncer, LeaderDecider leaderDecider, - PeriodicShardSyncManager periodicShardSyncManager) { + IPeriodicShardSyncManager periodicShardSyncManager, IShardConsumerFactory shardConsumerFactory) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; this.config = config; @@ -576,6 +579,7 @@ maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(lease this.workerStateChangeListener = workerStateChangeListener; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager); + this.shardConsumerFactory = shardConsumerFactory; this.leaseCleanupManager = LeaseCleanupManager.newInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), @@ -590,7 +594,7 @@ maxGetRecordsThreadPool, workerStateChangeListener, new KinesisShardSyncer(lease */ private void createShardSyncStrategy(ShardSyncStrategyType strategyType, LeaderDecider leaderDecider, - PeriodicShardSyncManager periodicShardSyncManager) { + IPeriodicShardSyncManager periodicShardSyncManager) { switch (strategyType) { case PERIODIC: this.leaderDecider = getOrCreateLeaderDecider(leaderDecider); @@ -652,7 +656,7 @@ LeaderDecider getLeaderDecider() { /** * @return the leaderElectedPeriodicShardSyncManager */ - PeriodicShardSyncManager getPeriodicShardSyncManager() { + IPeriodicShardSyncManager getPeriodicShardSyncManager() { return leaderElectedPeriodicShardSyncManager; } @@ -687,7 +691,7 @@ void runProcessLoop() { boolean foundCompletedShard = false; Set assignedShards = new HashSet<>(); for (ShardInfo shardInfo : getShardInfoForAssignments()) { - ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); + IShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); if (shardConsumer.isShutdown() && shardConsumer.getShutdownReason().equals(ShutdownReason.TERMINATE)) { foundCompletedShard = true; } else { @@ -695,10 +699,8 @@ void runProcessLoop() { } assignedShards.add(shardInfo); } - // clean up shard consumers for unassigned shards cleanupShardConsumers(assignedShards); - wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); } catch (Exception e) { @@ -983,9 +985,9 @@ Callable createWorkerShutdownCallable() { ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, lease, notificationCompleteLatch, shutdownCompleteLatch); ShardInfo shardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(lease); - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + IShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); - if (consumer == null || ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) { + if (consumer == null || KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE.equals(consumer.getCurrentState())) { // // CASE1: There is a race condition between retrieving the current assignments, and creating the // notification. If the a lease is lost in between these two points, we explicitly decrement the @@ -1007,7 +1009,7 @@ boolean isShutdownComplete() { return shutdownComplete; } - ConcurrentMap getShardInfoShardConsumerMap() { + ConcurrentMap getShardInfoShardConsumerMap() { return shardInfoShardConsumerMap; } @@ -1107,8 +1109,8 @@ boolean shouldShutdown() { * RecordProcessor factory * @return ShardConsumer for the shard */ - ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { - ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); + IShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + IShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier // lease instance (and was shutdown). Don't need to create another @@ -1123,7 +1125,7 @@ ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFact return consumer; } - protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { + protected IShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { final IRecordProcessor recordProcessor = processorFactory.createProcessor(); final RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( shardInfo, @@ -1134,7 +1136,11 @@ protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFacto streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), metricsFactory); - return new ShardConsumer(shardInfo, + if(shardConsumerFactory == null){ //Default to KinesisShardConsumerFactory if null + this.shardConsumerFactory = new KinesisShardConsumerFactory(); + } + + return shardConsumerFactory.createShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, @@ -1146,7 +1152,6 @@ protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFacto metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), retryGetRecordsInSeconds, maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, @@ -1224,8 +1229,8 @@ StreamConfig getStreamConfig() { * KinesisClientLibConfiguration * @return Returns metrics factory based on the config. */ - private static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, - KinesisClientLibConfiguration config) { + public static IMetricsFactory getMetricsFactory(AmazonCloudWatch cloudWatchClient, + KinesisClientLibConfiguration config) { IMetricsFactory metricsFactory; if (config.getMetricsLevel() == MetricsLevel.NONE) { metricsFactory = new NullMetricsFactory(); @@ -1278,13 +1283,13 @@ private LeaderDecider getOrCreateLeaderDecider(LeaderDecider leaderDecider) { /** A non-null PeriodicShardSyncManager can only provided from unit tests. Any application code will create the * PeriodicShardSyncManager for the first time here. */ - private PeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(PeriodicShardSyncManager periodicShardSyncManager, - boolean isAuditorMode) { + private IPeriodicShardSyncManager getOrCreatePeriodicShardSyncManager(IPeriodicShardSyncManager periodicShardSyncManager, + boolean isAuditorMode) { if (periodicShardSyncManager != null) { return periodicShardSyncManager; } - return new PeriodicShardSyncManager(config.getWorkerIdentifier(), + return new KinesisPeriodicShardSyncManager(config.getWorkerIdentifier(), leaderDecider, new ShardSyncTask(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), @@ -1353,6 +1358,10 @@ public static class Builder { @Setter @Accessors(fluent = true) private IKinesisProxy kinesisProxy; @Setter @Accessors(fluent = true) + private IPeriodicShardSyncManager periodicShardSyncManager; + @Setter @Accessors(fluent = true) + private IShardConsumerFactory shardConsumerFactory; + @Setter @Accessors(fluent = true) private WorkerStateChangeListener workerStateChangeListener; @Setter @Accessors(fluent = true) private LeaseCleanupValidator leaseCleanupValidator; @@ -1421,6 +1430,15 @@ public Worker build() { throw new IllegalArgumentException( "Kinesis Client Library configuration needs to be provided to build Worker"); } + if (periodicShardSyncManager != null) { + if (leaseManager == null || shardSyncer == null || metricsFactory == null || leaderDecider == null) { + + throw new IllegalArgumentException("LeaseManager, ShardSyncer, MetricsFactory, and LeaderDecider must be provided if PeriodicShardSyncManager is provided"); + } + } + if(shardConsumerFactory == null){ + shardConsumerFactory = new KinesisShardConsumerFactory(); + } if (recordProcessorFactory == null) { throw new IllegalArgumentException("A Record Processor Factory needs to be provided to build Worker"); } @@ -1546,7 +1564,8 @@ public Worker build() { workerStateChangeListener, shardSyncer, leaderDecider, - null /* PeriodicShardSyncManager */); + periodicShardSyncManager, + shardConsumerFactory); } > R createClient(final T builder, diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java index d19fc3ed0..dfb44cf8d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -114,7 +114,7 @@ public void start() { completedLeaseStopwatch.start(); garbageLeaseStopwatch.start(); deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS); isRunning = true; } else { LOG.info("Lease cleanup thread already running, no need to start."); @@ -241,7 +241,7 @@ public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { // throws ResourceNotFoundException wereChildShardsPresent = !CollectionUtils - .isNullOrEmpty(getChildShardsFromService(shardInfo)); + .isNullOrEmpty(getChildShardsFromService(shardInfo)); } } catch (ResourceNotFoundException e) { wasResourceNotFound = true; @@ -296,7 +296,7 @@ private CompletedShardResult cleanupLeaseForCompletedShard(KinesisClientLease le for (String childShardLeaseKey : childShardLeaseKeys) { final KinesisClientLease childShardLease = Optional.ofNullable( - leaseManager.getLease(childShardLeaseKey)) + leaseManager.getLease(childShardLeaseKey)) .orElseThrow(() -> new IllegalStateException( "Child lease " + childShardLeaseKey + " for completed shard not found in " + "lease table - not cleaning up lease " + lease)); @@ -406,4 +406,4 @@ private static class CompletedShardResult { boolean cleanedUp; String failureMsg; } -} +} \ No newline at end of file diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index 6a5e76b93..63cceb6e9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -14,8 +14,8 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ConsumerState; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisConsumerStates.ShardConsumerState; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -50,7 +50,7 @@ public class ConsumerStatesTest { @Mock - private ShardConsumer consumer; + private KinesisShardConsumer consumer; @Mock private StreamConfig streamConfig; @Mock @@ -251,9 +251,9 @@ public void shutdownRequestState() { equalTo((IRecordProcessorCheckpointer) recordProcessorCheckpointer))); assertThat(task, shutdownReqTask(ShutdownNotification.class, "shutdownNotification", equalTo(shutdownNotification))); - assertThat(state.successTransition(), equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + assertThat(state.successTransition(), equalTo(KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); assertThat(state.shutdownTransition(ShutdownReason.REQUESTED), - equalTo(ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); + equalTo(KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE)); assertThat(state.shutdownTransition(ShutdownReason.ZOMBIE), equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); assertThat(state.shutdownTransition(ShutdownReason.TERMINATE), @@ -266,7 +266,7 @@ public void shutdownRequestState() { @Test public void shutdownRequestCompleteStateTest() { - ConsumerState state = ConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE; + ConsumerState state = KinesisConsumerStates.SHUTDOWN_REQUEST_COMPLETION_STATE; assertThat(state.createTask(consumer), nullValue()); @@ -345,9 +345,9 @@ public void shutdownCompleteStateNullNotificationTest() { verify(shutdownNotification, never()).shutdownComplete(); } - static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, + static ReflectionPropertyMatcher shutdownTask(Class valueTypeClass, String propertyName, Matcher matcher) { - return taskWith(ShutdownTask.class, valueTypeClass, propertyName, matcher); + return taskWith(KinesisShutdownTask.class, valueTypeClass, propertyName, matcher); } static ReflectionPropertyMatcher shutdownReqTask( diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java index 11e274cb9..2a3b27748 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GracefulShutdownCoordinatorTest.java @@ -49,7 +49,7 @@ public class GracefulShutdownCoordinatorTest { @Mock private Callable contextCallable; @Mock - private ConcurrentMap shardInfoConsumerMap; + private ConcurrentMap shardInfoConsumerMap; @Test public void testAllShutdownCompletedAlready() throws Exception { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java index 779ba92f5..016ab32e9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PeriodicShardSyncManagerTest.java @@ -39,8 +39,8 @@ import java.util.List; import java.util.stream.Collectors; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MAX_HASH_KEY; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.PeriodicShardSyncManager.MIN_HASH_KEY; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisPeriodicShardSyncManager.MAX_HASH_KEY; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisPeriodicShardSyncManager.MIN_HASH_KEY; import static com.amazonaws.services.kinesis.leases.impl.HashKeyRangeForLease.deserialize; import static org.mockito.Mockito.when; @@ -52,10 +52,10 @@ public class PeriodicShardSyncManagerTest { public static final int LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD = 3; /** Manager for PERIODIC shard sync strategy */ - private PeriodicShardSyncManager periodicShardSyncManager; + private KinesisPeriodicShardSyncManager periodicShardSyncManager; /** Manager for SHARD_END shard sync strategy */ - private PeriodicShardSyncManager auditorPeriodicShardSyncManager; + private KinesisPeriodicShardSyncManager auditorPeriodicShardSyncManager; @Mock private LeaderDecider leaderDecider; @@ -70,10 +70,10 @@ public class PeriodicShardSyncManagerTest { @Before public void setup() { - periodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, + periodicShardSyncManager = new KinesisPeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, metricsFactory, leaseManager, kinesisProxy, false, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); - auditorPeriodicShardSyncManager = new PeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, + auditorPeriodicShardSyncManager = new KinesisPeriodicShardSyncManager(WORKER_ID, leaderDecider, shardSyncTask, metricsFactory, leaseManager, kinesisProxy, true, LEASES_RECOVERY_AUDITOR_EXECUTION_FREQUENCY_MILLIS, LEASES_RECOVERY_AUDITOR_INCONSISTENCY_CONFIDENCE_THRESHOLD); } @@ -92,7 +92,7 @@ public void testForFailureWhenHashRangesAreIncomplete() { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertTrue(PeriodicShardSyncManager + Assert.assertTrue(KinesisPeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -110,7 +110,7 @@ public void testForSuccessWhenHashRangesAreComplete() { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(PeriodicShardSyncManager + Assert.assertFalse(KinesisPeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -128,7 +128,7 @@ public void testForSuccessWhenUnsortedHashRangesAreComplete() { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(PeriodicShardSyncManager + Assert.assertFalse(KinesisPeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } @@ -147,7 +147,7 @@ public void testForSuccessWhenHashRangesAreCompleteForOverlappingLeasesAtEnd() { lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); return lease; }).collect(Collectors.toList()); - Assert.assertFalse(PeriodicShardSyncManager + Assert.assertFalse(KinesisPeriodicShardSyncManager .checkForHoleInHashKeyRanges(hashRanges).isPresent()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 7a5e7fd25..8936a28e1 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -85,7 +85,7 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType; /** - * Unit tests of {@link ShardConsumer}. + * Unit tests of {@link KinesisShardConsumer}. */ @RunWith(MockitoJUnitRunner.class) public class ShardConsumerTest { @@ -160,8 +160,8 @@ public final void testInitializationStateUponFailure() throws Exception { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -175,19 +175,19 @@ public final void testInitializationStateUponFailure() throws Exception { config, shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); } /** @@ -210,8 +210,8 @@ public final void testInitializationStateUponSubmissionFailure() throws Exceptio callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -226,21 +226,21 @@ public final void testInitializationStateUponSubmissionFailure() throws Exceptio shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); doThrow(new RejectedExecutionException()).when(spyExecutorService).submit(any(InitializeTask.class)); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); } @Test @@ -258,8 +258,8 @@ public void testInitializationStateTransitionsToShutdownOnLeaseNotFound() throws callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -273,19 +273,19 @@ public void testInitializationStateTransitionsToShutdownOnLeaseNotFound() throws config, shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); } @@ -300,8 +300,8 @@ public final void testRecordProcessorThrowable() throws Exception { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -324,10 +324,10 @@ public final void testRecordProcessorThrowable() throws Exception { when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(processor, times(0)).initialize(any(InitializationInput.class)); // Throw Error when IRecordProcessor.initialize() is invoked. @@ -335,7 +335,7 @@ public final void testRecordProcessorThrowable() throws Exception { consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); @@ -347,7 +347,7 @@ public final void testRecordProcessorThrowable() throws Exception { assertThat(e.getCause(), instanceOf(ExecutionException.class)); } Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); @@ -355,7 +355,7 @@ public final void testRecordProcessorThrowable() throws Exception { consumer.consumeShard(); // submit InitializeTask again. Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(2)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); verify(processor, times(2)).initialize(any(InitializationInput.class)); // no other calls with different args @@ -363,11 +363,11 @@ public final void testRecordProcessorThrowable() throws Exception { // Checking the status of submitted InitializeTask from above should pass. consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); } /** - * Test method for {@link ShardConsumer#consumeShard()} + * Test method for {@link KinesisShardConsumer#consumeShard()} */ @Test public final void testConsumeShard() throws Exception { @@ -420,8 +420,8 @@ public final void testConsumeShard() throws Exception { any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -440,11 +440,11 @@ public final void testConsumeShard() throws Exception { shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -454,7 +454,7 @@ public final void testConsumeShard() throws Exception { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -469,21 +469,21 @@ public final void testConsumeShard() throws Exception { assertThat(processor.getNotifyShutdownLatch().await(1, TimeUnit.SECONDS), is(true)); Thread.sleep(50); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); verify(shutdownNotification).shutdownNotificationComplete(); assertThat(processor.isShutdownNotificationCalled(), equalTo(true)); consumer.consumeShard(); Thread.sleep(50); - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_REQUESTED)); consumer.beginShutdown(); Thread.sleep(50L); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.ZOMBIE)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); consumer.consumeShard(); verify(shutdownNotification, atLeastOnce()).shutdownComplete(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); verify(getRecordsCache).shutdown(); @@ -524,8 +524,8 @@ public final void testShardConsumerShutdownWhenBlockedOnParent() throws Exceptio when(recordProcessorCheckpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(streamConfig.getStreamProxy()).thenReturn(streamProxy); - final ShardConsumer consumer = - new ShardConsumer(shardInfo, + final KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -544,21 +544,21 @@ public final void testShardConsumerShutdownWhenBlockedOnParent() throws Exceptio shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(parentLease, times(0)).getCheckpoint(); consumer.consumeShard(); // check on parent shards Thread.sleep(parentShardPollIntervalMillis * 2); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(parentLease, times(1)).getCheckpoint(); consumer.notifyShutdownRequested(shutdownNotification); verify(shutdownNotification, times(0)).shutdownComplete(); assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); Thread.sleep(50L); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(consumer.isShutdown(), is(true)); verify(shutdownNotification, times(1)).shutdownComplete(); consumer.beginShutdown(); @@ -583,7 +583,7 @@ public void shutdown(ShutdownInput input) { } /** - * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record + * Test method for {@link KinesisShardConsumer#consumeShard()} that ensures a transient error thrown from the record * processor's shutdown method with reason zombie will be retried. */ @Test @@ -646,8 +646,8 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception metricsFactory ); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -667,11 +667,11 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception shardSyncStrategy); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -681,7 +681,7 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -709,12 +709,12 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception // Wait for shutdown complete now that terminate shutdown is successful for (int i = 0; i < 100; i++) { consumer.consumeShard(); - if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + if (consumer.getCurrentState() == KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { break; } Thread.sleep(50L); } - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); @@ -732,7 +732,7 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception /** - * Test method for {@link ShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown + * Test method for {@link KinesisShardConsumer#consumeShard()} that ensures the shardConsumer gets shutdown with shutdown * reason TERMINATE when the shard end is reached. */ @Test @@ -795,8 +795,8 @@ public final void testConsumeShardWithShardEnd() throws Exception { metricsFactory ); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -818,11 +818,11 @@ public final void testConsumeShardWithShardEnd() throws Exception { when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize processor.getInitializeLatch().await(5, TimeUnit.SECONDS); verify(getRecordsCache).start(); @@ -832,7 +832,7 @@ public final void testConsumeShardWithShardEnd() throws Exception { boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -860,12 +860,12 @@ public final void testConsumeShardWithShardEnd() throws Exception { // Wait for shutdown complete now that terminate shutdown is successful for (int i = 0; i < 100; i++) { consumer.consumeShard(); - if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + if (consumer.getCurrentState() == KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { break; } Thread.sleep(50L); } - assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + assertThat(consumer.getCurrentState(), equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); @@ -881,7 +881,7 @@ public final void testConsumeShardWithShardEnd() throws Exception { } /** - * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. + * Test method for {@link KinesisShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */ @Test public final void testConsumeShardWithInitialPositionAtTimestamp() throws Exception { @@ -938,8 +938,8 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -958,11 +958,11 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except shardSyncer, shardSyncStrategy); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards Thread.sleep(50L); consumer.consumeShard(); // start initialization - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); consumer.consumeShard(); // initialize Thread.sleep(50L); @@ -973,7 +973,7 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except boolean newTaskSubmitted = consumer.consumeShard(); if (newTaskSubmitted) { LOG.debug("New processing task was submitted, call # " + i); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES i += maxRecords; } @@ -985,9 +985,9 @@ public final void testConsumeShardWithInitialPositionAtTimestamp() throws Except assertThat(processor.getShutdownReason(), nullValue()); consumer.beginShutdown(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTTING_DOWN))); consumer.beginShutdown(); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.ZOMBIE))); executorService.shutdown(); @@ -1014,8 +1014,8 @@ public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exce callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer consumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer consumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1041,22 +1041,22 @@ public final void testConsumeShardInitializedWithPendingCheckpoint() throws Exce when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // submit BlockOnParentShardTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); verify(processor, times(0)).initialize(any(InitializationInput.class)); consumer.consumeShard(); // submit InitializeTask Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.INITIALIZING))); verify(processor, times(1)).initialize(argThat( initializationInputMatcher(checkpointSequenceNumber, pendingCheckpointSequenceNumber))); verify(processor, times(1)).initialize(any(InitializationInput.class)); // no other calls with different args consumer.consumeShard(); Thread.sleep(50L); - assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + assertThat(consumer.getCurrentState(), is(equalTo(KinesisConsumerStates.ShardConsumerState.PROCESSING))); } @Test @@ -1069,8 +1069,8 @@ public void testCreateSynchronousGetRecordsRetrieval() { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer shardConsumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1101,8 +1101,8 @@ public void testCreateAsynchronousGetRecordsRetrieval() { callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = - new ShardConsumer(shardInfo, + KinesisShardConsumer shardConsumer = + new KinesisShardConsumer(shardInfo, streamConfig, checkpoint, processor, @@ -1144,7 +1144,7 @@ public void testLongRunningTasks() throws InterruptedException { skipCheckpointValidationValue, INITIAL_POSITION_LATEST); - ShardConsumer shardConsumer = new ShardConsumer( + KinesisShardConsumer shardConsumer = new KinesisShardConsumer( shardInfo, streamConfig, checkpoint, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 053a8bf76..e8870d666 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -66,7 +66,7 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.RETRY_RANDOM_MAX_RANGE; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisShutdownTask.RETRY_RANDOM_MAX_RANGE; /** * @@ -139,7 +139,7 @@ public void tearDown() throws Exception { } /** - * Test method for {@link ShutdownTask#call()}. + * Test method for {@link KinesisShutdownTask#call()}. */ @Test public final void testCallWhenApplicationDoesNotCheckpoint() { @@ -148,7 +148,7 @@ public final void testCallWhenApplicationDoesNotCheckpoint() { when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -171,7 +171,7 @@ public final void testCallWhenApplicationDoesNotCheckpoint() { } /** - * Test method for {@link ShutdownTask#call()}. + * Test method for {@link KinesisShutdownTask#call()}. */ @Test public final void testCallWhenCreatingLeaseThrows() throws Exception { @@ -183,7 +183,7 @@ public final void testCallWhenCreatingLeaseThrows() throws Exception { final String exceptionMessage = "InvalidStateException is thrown."; when(leaseManager.createLeaseIfNotExists(any(KinesisClientLease.class))).thenThrow(new InvalidStateException(exceptionMessage)); - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -226,7 +226,7 @@ public final void testCallWhenParentInfoNotPresentInLease() throws Exception { // Make first 5 attempts with partial parent info in lease table for (int i = 0; i < 5; i++) { - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -252,7 +252,7 @@ public final void testCallWhenParentInfoNotPresentInLease() throws Exception { } // Make next attempt with complete parent info in lease table - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -290,7 +290,7 @@ public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() thr when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null); for (int i = 0; i < 10; i++) { - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -315,7 +315,7 @@ public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() thr verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); } - ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = spy(new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -351,7 +351,7 @@ public final void testCallWhenShardEnd() throws Exception { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -385,7 +385,7 @@ public final void testCallWhenShardNotFound() throws Exception { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(shardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(shardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.TERMINATE, @@ -415,7 +415,7 @@ public final void testCallWhenLeaseLost() throws Exception { boolean cleanupLeasesOfCompletedShards = false; boolean ignoreUnexpectedChildShards = false; - ShutdownTask task = new ShutdownTask(defaultShardInfo, + KinesisShutdownTask task = new KinesisShutdownTask(defaultShardInfo, defaultRecordProcessor, checkpointer, ShutdownReason.ZOMBIE, @@ -438,12 +438,12 @@ public final void testCallWhenLeaseLost() throws Exception { } /** - * Test method for {@link ShutdownTask#getTaskType()}. + * Test method for {@link KinesisShutdownTask#getTaskType()}. */ @Test public final void testGetTaskType() { KinesisClientLibLeaseCoordinator leaseCoordinator = mock(KinesisClientLibLeaseCoordinator.class); - ShutdownTask task = new ShutdownTask(null, null, null, null, + KinesisShutdownTask task = new KinesisShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 50a3aa9b5..75d741866 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -186,7 +186,7 @@ public class WorkerTest { @Mock private IRecordProcessor v2RecordProcessor; @Mock - private ShardConsumer shardConsumer; + private IShardConsumer shardConsumer; @Mock private Future taskFuture; @Mock @@ -297,13 +297,13 @@ public final void testCreateOrGetShardConsumer() { KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); + IShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertNotNull(consumer); - ShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory); + IShardConsumer consumer2 = worker.createOrGetShardConsumer(shardInfo, streamletFactory); Assert.assertSame(consumer, consumer2); ShardInfo shardInfoWithSameShardIdButDifferentConcurrencyToken = new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumer3 = + IShardConsumer consumer3 = worker.createOrGetShardConsumer(shardInfoWithSameShardIdButDifferentConcurrencyToken, streamletFactory); Assert.assertNotNull(consumer3); Assert.assertNotSame(consumer3, consumer); @@ -419,10 +419,10 @@ public final void testCleanupShardConsumers() { new ShardInfo(dummyKinesisShardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo2 = new ShardInfo(anotherDummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - ShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory); - ShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken = + IShardConsumer consumerOfShardInfo1 = worker.createOrGetShardConsumer(shardInfo1, streamletFactory); + IShardConsumer consumerOfDuplicateOfShardInfo1ButWithAnotherConcurrencyToken = worker.createOrGetShardConsumer(duplicateOfShardInfo1ButWithAnotherConcurrencyToken, streamletFactory); - ShardConsumer consumerOfShardInfo2 = worker.createOrGetShardConsumer(shardInfo2, streamletFactory); + IShardConsumer consumerOfShardInfo2 = worker.createOrGetShardConsumer(shardInfo2, streamletFactory); Set assignedShards = new HashSet(); assignedShards.add(shardInfo1); @@ -1219,11 +1219,11 @@ public void testShutdownDoesNotBlockOnCompletedLeases() throws Exception { false, shardPrioritization); - final Map shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap(); + final Map shardInfoShardConsumerMap = worker.getShardInfoShardConsumerMap(); final ShardInfo completedShardInfo = KinesisClientLibLeaseCoordinator.convertLeaseToAssignment(completedLease); - final ShardConsumer completedShardConsumer = mock(ShardConsumer.class); + final KinesisShardConsumer completedShardConsumer = mock(KinesisShardConsumer.class); shardInfoShardConsumerMap.put(completedShardInfo, completedShardConsumer); - when(completedShardConsumer.getCurrentState()).thenReturn(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE); + when(completedShardConsumer.getCurrentState()).thenReturn(KinesisConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE); Callable callable = worker.createWorkerShutdownCallable(); assertThat(worker.hasGracefulShutdownStarted(), equalTo(false)); @@ -1338,11 +1338,11 @@ public List answer(InvocationOnMock invocation) throws Throwable { verify(executorService).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); } @@ -1451,11 +1451,11 @@ public List answer(InvocationOnMock invocation) throws Throwable { verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo1))))); verify(executorService, never()).submit(argThat(both(isA(MetricsCollectingTaskDecorator.class)) .and(TaskTypeMatcher.isOfType(TaskType.SHUTDOWN)).and(ReflectionFieldMatcher - .withField(ShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); + .withField(KinesisShutdownTask.class, "shardInfo", equalTo(shardInfo2))))); @@ -2013,19 +2013,19 @@ public void describeTo(Description description) { @Override protected boolean matchesSafely(MetricsCollectingTaskDecorator item, Description mismatchDescription) { return Condition.matched(item, mismatchDescription) - .and(new Condition.Step() { + .and(new Condition.Step() { @Override - public Condition apply(MetricsCollectingTaskDecorator value, + public Condition apply(MetricsCollectingTaskDecorator value, Description mismatch) { - if (!(value.getOther() instanceof ShutdownTask)) { + if (!(value.getOther() instanceof KinesisShutdownTask)) { mismatch.appendText("Wrapped task isn't a shutdown task"); return Condition.notMatched(); } - return Condition.matched((ShutdownTask) value.getOther(), mismatch); + return Condition.matched((KinesisShutdownTask) value.getOther(), mismatch); } - }).and(new Condition.Step() { + }).and(new Condition.Step() { @Override - public Condition apply(ShutdownTask value, Description mismatch) { + public Condition apply(KinesisShutdownTask value, Description mismatch) { return Condition.matched(value.getReason(), mismatch); } }).matching(matcher);