Skip to content

Commit

Permalink
Revert decorator change to make scheduler assessors private
Browse files Browse the repository at this point in the history
  • Loading branch information
ethkatnic committed Nov 6, 2024
1 parent 5b97424 commit a97f0fd
Showing 1 changed file with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,13 @@
/**
*
*/
@Getter(AccessLevel.PRIVATE)
@Getter
@Accessors(fluent = true)
@Slf4j
@KinesisClientInternalApi
public class Scheduler implements Runnable {

private static final int PERIODIC_SHARD_SYNC_MAX_WORKERS_DEFAULT = 1;

private static final long LEASE_TABLE_CHECK_FREQUENCY_MILLIS = 3 * 1000L;
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
Expand All @@ -148,9 +147,7 @@ public class Scheduler implements Runnable {
private final ProcessorConfig processorConfig;
private final RetrievalConfig retrievalConfig;

@Getter(AccessLevel.PACKAGE)
private final String applicationName;

private final int maxInitializationAttempts;
private final Checkpointer checkpoint;
private final long shardConsumerDispatchPollIntervalMillis;
Expand All @@ -173,8 +170,6 @@ public class Scheduler implements Runnable {
private final long failoverTimeMillis;
private final long taskBackoffTimeMillis;
private final boolean isMultiStreamMode;

@Getter(AccessLevel.PACKAGE)
private final Map<StreamIdentifier, StreamConfig> currentStreamConfigMap = new StreamConfigMap();

private final StreamTracker streamTracker;
Expand All @@ -188,36 +183,37 @@ public class Scheduler implements Runnable {
private final Function<StreamConfig, HierarchicalShardSyncer> hierarchicalShardSyncerProvider;
private final long schedulerInitializationBackoffTimeMillis;
private LeaderDecider leaderDecider;

@Getter(AccessLevel.PACKAGE)
private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>();

private final LeaseCleanupManager leaseCleanupManager;
private final SchemaRegistryDecoder schemaRegistryDecoder;

@Getter(AccessLevel.PACKAGE)
private final DeletedStreamListProvider deletedStreamListProvider;

@Getter(AccessLevel.NONE)
private final MigrationStateMachine migrationStateMachine;

@Getter(AccessLevel.NONE)
private final DynamicMigrationComponentsInitializer migrationComponentsInitializer;

@Getter(AccessLevel.NONE)
private final MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider;

// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@Getter(AccessLevel.PACKAGE)
private final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<>();

private volatile boolean shutdown;
private volatile long shutdownStartTimeMillis;

@Getter(AccessLevel.PACKAGE)
private volatile boolean shutdownComplete = false;

private final Object lock = new Object();

private final Stopwatch streamSyncWatch = Stopwatch.createUnstarted();

private boolean leasesSyncedOnAppInit = false;

@Getter(AccessLevel.NONE)
private final AtomicBoolean leaderSynced = new AtomicBoolean(false);

/**
Expand All @@ -229,6 +225,7 @@ public class Scheduler implements Runnable {
* CountDownLatch used by the GracefulShutdownCoordinator. Reaching zero means that
* the scheduler's finalShutdown() call has completed.
*/
@Getter(AccessLevel.NONE)
private final CountDownLatch finalShutdownLatch = new CountDownLatch(1);

@VisibleForTesting
Expand Down

0 comments on commit a97f0fd

Please sign in to comment.