Skip to content

Commit

Permalink
Make startup more robust and prevent auto topic creation when using C…
Browse files Browse the repository at this point in the history
…ruiseControlMetricsReporterSampler (#2211)
  • Loading branch information
k0b3rIT authored Feb 18, 2025
1 parent e9bf2d5 commit e9c41f4
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ public static void maybeUpdateConfig(Set<AlterConfigOp> configsToAlter,
* @param scaleMs the scale for computing the delay
* @param base the base for computing the delay
* @param maxAttempts the max number of attempts on calling the function
* @param maxSleepMs the maximum sleep time between retries
* @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded.
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
*/
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts) {
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts, int maxSleepMs) {
if (maxAttempts > 0) {
int attempts = 0;
long timeToSleep = scaleMs;
Expand All @@ -179,6 +180,9 @@ public static boolean retry(Supplier<Boolean> function, long scaleMs, int base,
return false;
}
timeToSleep *= base;
if (maxSleepMs > 0 && timeToSleep > maxSleepMs) {
timeToSleep = maxSleepMs;
}
Thread.sleep(timeToSleep);
} catch (InterruptedException ignored) {

Expand All @@ -200,7 +204,21 @@ public static boolean retry(Supplier<Boolean> function, long scaleMs, int base,
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
*/
public static boolean retry(Supplier<Boolean> function, int maxAttempts) {
return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts);
return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts, -1);
}

/**
* Retries the {@code Supplier<Boolean>} function while it returns {@code true} and for the specified max number of attempts.
* It uses -1 as maxSleepMs, to not limit the sleep time between retries.
* @param function the code to call and retry if needed
* @param scaleMs the scale for computing the delay
* @param base the base for computing the delay
* @param maxAttempts the max number of attempts on calling the function
* @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded.
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
*/
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts) {
return retry(function, scaleMs, base, maxAttempts, -1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,7 @@ public static <K, KT extends Deserializer<K>, V, VT extends Deserializer<V>> Con
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, configs.get(RECONNECT_BACKOFF_MS_CONFIG).toString());
consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
return new KafkaConsumer<>(consumerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -36,6 +37,10 @@ public class CruiseControlMetricsReporterSampler extends AbstractMetricSampler {
// Configurations
public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers";
public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic";
public static final String METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS = "metric.reporter.sampler.topic.assert.attempts";

public static final int METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT = 5;

@Deprecated
public static final String METRIC_REPORTER_SAMPLER_GROUP_ID = "metric.reporter.sampler.group.id";
public static final Duration METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = Duration.ofMillis(5000L);
Expand Down Expand Up @@ -151,13 +156,26 @@ protected boolean refreshPartitionAssignment() {
return false;
}

private boolean isMetricsTopicExists() {
Map<String, List<PartitionInfo>> topics = _metricConsumer.listTopics();
if (!topics.containsKey(_metricReporterTopic)) {
return false;
}
return true;
}

@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
_metricReporterTopic = (String) configs.get(METRIC_REPORTER_TOPIC);
if (_metricReporterTopic == null) {
_metricReporterTopic = CruiseControlMetricsReporterConfig.DEFAULT_CRUISE_CONTROL_METRICS_TOPIC;
}
String metricTopicAssertAttemptsStr = (String) configs.get(METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS);
int metricTopicAssertAttempts = METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT;
if (metricTopicAssertAttemptsStr != null) {
metricTopicAssertAttempts = Integer.parseInt(metricTopicAssertAttemptsStr);
}
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);
_acceptableMetricRecordProduceDelayMs = ACCEPTABLE_NETWORK_DELAY_MS
+ Math.max(reporterConfig.getLong(CruiseControlMetricsReporterConfig
Expand All @@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) {
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG));
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX);
_currentPartitionAssignment = Collections.emptySet();

LOG.info("Waiting for metrics reporter topic [{}] to be available in the Kafka cluster.", _metricReporterTopic);
if (!CruiseControlMetricsUtils.retry(() -> !this.isMetricsTopicExists(), 2000, 2, metricTopicAssertAttempts, 30_000)) {
throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic
+ "] in the Kafka cluster.");
}

if (refreshPartitionAssignment()) {
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches "
+ _metricReporterTopic + " in the target cluster.");
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches ["
+ _metricReporterTopic + "] in the Kafka cluster.");
}
}

Expand Down
11 changes: 6 additions & 5 deletions docs/wiki/User Guide/Configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,12 @@ We are still trying to improve cruise control. And following are some configurat
## Configurations of pluggable classes

### CruiseControlMetricsReporterSampler configurations
| Name | Type | Required? | Default Value | Description |
|-------------------------------------------|--------|-----------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
| metric.reporter.sampler.bootstrap.servers | String | N | The same as `bootstrap.servers` config from Cruise Control | The Kafka cluster to consume the interested metrics collected by CruiseControlMetricsReporter. |
| metric.reporter.topic | String | N | "__CruiseControlMetrics" | The exact topic name from which the sampler should be consuming the interested metrics from. |
| metric.reporter.sampler.group.id | String | N | 60,000 | The consumer group id to use for the consumers to consume from the Kafka cluster. |
| Name | Type | Required? | Default Value | Description |
|------------------------------------------------|---------|-----------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------|
| metric.reporter.sampler.bootstrap.servers | String | N | The same as `bootstrap.servers` config from Cruise Control | The Kafka cluster to consume the interested metrics collected by CruiseControlMetricsReporter. |
| metric.reporter.topic | String | N | "__CruiseControlMetrics" | The exact topic name from which the sampler should be consuming the interested metrics from. |
| metric.reporter.sampler.group.id | String | N | 60,000 | The consumer group id to use for the consumers to consume from the Kafka cluster. |
| metric.reporter.sampler.topic.assert.attempts | Integer | N | 5 | Number of attempts while waiting for metrics topic to appear in the Kafka cluster during the startup.|

### PrometheusMetricSampler configurations
| Name | Type | Required? | Default Value | Description |
Expand Down

0 comments on commit e9c41f4

Please sign in to comment.