From a97f0fd2d0535cf30dcf4447f1093e84c40b4566 Mon Sep 17 00:00:00 2001 From: Ethan Katnic Date: Tue, 5 Nov 2024 18:00:41 -0800 Subject: [PATCH] Revert decorator change to make scheduler assessors private --- .../amazon/kinesis/coordinator/Scheduler.java | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 1a9dfe87e..0adb69f93 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -119,14 +119,13 @@ /** * */ -@Getter(AccessLevel.PRIVATE) +@Getter @Accessors(fluent = true) @Slf4j @KinesisClientInternalApi public class Scheduler implements Runnable { private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1; - private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L; private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L; private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L; @@ -148,9 +147,7 @@ public class Scheduler implements Runnable { private final ProcessorConfig processorConfig; private final RetrievalConfig retrievalConfig; - @Getter(AccessLevel.PACKAGE) private final String applicationName; - private final int maxInitializationAttempts; private final Checkpointer checkpoint; private final long shardConsumerDispatchPollIntervalMillis; @@ -173,8 +170,6 @@ public class Scheduler implements Runnable { private final long failoverTimeMillis; private final long taskBackoffTimeMillis; private final boolean isMultiStreamMode; - - @Getter(AccessLevel.PACKAGE) private final Map currentStreamConfigMap = new StreamConfigMap(); private final StreamTracker streamTracker; @@ -188,29 +183,28 @@ public class Scheduler implements Runnable { private final Function hierarchicalShardSyncerProvider; private final long schedulerInitializationBackoffTimeMillis; private LeaderDecider leaderDecider; - - @Getter(AccessLevel.PACKAGE) private final Map staleStreamDeletionMap = new HashMap<>(); - private final LeaseCleanupManager leaseCleanupManager; private final SchemaRegistryDecoder schemaRegistryDecoder; - @Getter(AccessLevel.PACKAGE) private final DeletedStreamListProvider deletedStreamListProvider; + @Getter(AccessLevel.NONE) private final MigrationStateMachine migrationStateMachine; + + @Getter(AccessLevel.NONE) private final DynamicMigrationComponentsInitializer migrationComponentsInitializer; + + @Getter(AccessLevel.NONE) private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. - @Getter(AccessLevel.PACKAGE) private final ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap<>(); private volatile boolean shutdown; private volatile long shutdownStartTimeMillis; - @Getter(AccessLevel.PACKAGE) private volatile boolean shutdownComplete = false; private final Object lock = new Object(); @@ -218,6 +212,8 @@ public class Scheduler implements Runnable { private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted(); private boolean leasesSyncedOnAppInit = false; + + @Getter(AccessLevel.NONE) private final AtomicBoolean leaderSynced = new AtomicBoolean(false); /** @@ -229,6 +225,7 @@ public class Scheduler implements Runnable { * CountDownLatch used by the GracefulShutdownCoordinator. Reaching zero means that * the scheduler's finalShutdown() call has completed. */ + @Getter(AccessLevel.NONE) private final CountDownLatch finalShutdownLatch = new CountDownLatch(1); @VisibleForTesting