Skip to content

Commit

Permalink
Having a way to override the consumerTaskFactory as part of LeaseMana…
Browse files Browse the repository at this point in the history
…gementConfig (#1441)
  • Loading branch information
gguptp authored Feb 24, 2025
1 parent 8deebe4 commit c9563ab
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.ConsumerTaskFactory;
import software.amazon.kinesis.lifecycle.KinesisConsumerTaskFactory;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
import software.amazon.kinesis.lifecycle.ShardConsumerArgument;
Expand Down Expand Up @@ -267,33 +266,6 @@ protected Scheduler(
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig,
@NonNull final DiagnosticEventFactory diagnosticEventFactory) {
this(
checkpointConfig,
coordinatorConfig,
leaseManagementConfig,
lifecycleConfig,
metricsConfig,
processorConfig,
retrievalConfig,
diagnosticEventFactory,
new KinesisConsumerTaskFactory());
}

/**
* Customers do not currently have the ability to customize the DiagnosticEventFactory, but this visibility
* is desired for testing. This constructor is only used for testing to provide a mock DiagnosticEventFactory.
*/
@VisibleForTesting
protected Scheduler(
@NonNull final CheckpointConfig checkpointConfig,
@NonNull final CoordinatorConfig coordinatorConfig,
@NonNull final LeaseManagementConfig leaseManagementConfig,
@NonNull final LifecycleConfig lifecycleConfig,
@NonNull final MetricsConfig metricsConfig,
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig,
@NonNull final DiagnosticEventFactory diagnosticEventFactory,
@NonNull final ConsumerTaskFactory taskFactory) {
this.checkpointConfig = checkpointConfig;
this.coordinatorConfig = coordinatorConfig;
this.leaseManagementConfig = leaseManagementConfig;
Expand Down Expand Up @@ -401,7 +373,7 @@ protected Scheduler(
this.schemaRegistryDecoder = this.retrievalConfig.glueSchemaRegistryDeserializer() == null
? null
: new SchemaRegistryDecoder(this.retrievalConfig.glueSchemaRegistryDeserializer());
this.taskFactory = taskFactory;
this.taskFactory = leaseManagementConfig().consumerTaskFactory();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
import software.amazon.kinesis.lifecycle.ConsumerTaskFactory;
import software.amazon.kinesis.lifecycle.KinesisConsumerTaskFactory;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.worker.metric.WorkerMetric;
Expand Down Expand Up @@ -215,6 +217,8 @@ public class LeaseManagementConfig {

private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;

private ConsumerTaskFactory consumerTaskFactory = new KinesisConsumerTaskFactory();

private WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig =
new WorkerUtilizationAwareAssignmentConfig();

Expand Down

0 comments on commit c9563ab

Please sign in to comment.