Skip to content

Commit

Permalink
Update error handling logic in KafkaLeadershipWatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Nov 12, 2024
1 parent ea906c1 commit b5688d7
Showing 1 changed file with 59 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,23 @@ private void applyCurrentState() throws InterruptedException, KeeperException {
long startTime = System.currentTimeMillis();
Set<TopicPartition> currentLeadingPartitions = new HashSet<>();
int numPartitionsProcessed = 0;
for (String topic: zooKeeper.getChildren(TOPICS_ZK_PATH, true)) {
String partitionsPath = String.format("%s/%s/%s", TOPICS_ZK_PATH, topic, PARTITIONS_ZK_SUBPATH);
for (String partition: zooKeeper.getChildren(partitionsPath, true)) {
String partitionStatePath = String.format("%s/%s/state", partitionsPath, partition);
processNodeDataChangedForPartitionState(partitionStatePath, currentLeadingPartitions);
numPartitionsProcessed++;
try {
for (String topic: zooKeeper.getChildren(TOPICS_ZK_PATH, true)) {
String partitionsPath = String.format("%s/%s/%s", TOPICS_ZK_PATH, topic, PARTITIONS_ZK_SUBPATH);
for (String partition: zooKeeper.getChildren(partitionsPath, true)) {
String partitionStatePath = String.format("%s/%s/state", partitionsPath, partition);
processNodeDataChangedForPartitionState(partitionStatePath, currentLeadingPartitions);
numPartitionsProcessed++;
}
}
unwatchDeletedPartitions(currentLeadingPartitions);
LOG.info(String.format("Finished applying current ZK state in %dms. " +
"Number of partitions processed=%d, number of leading partitions=%d",
System.currentTimeMillis() - startTime, numPartitionsProcessed, currentLeadingPartitions.size()));
} catch (KeeperException | InterruptedException e) {
tryResetZkClient(e);
throw e;
}
unwatchDeletedPartitions(currentLeadingPartitions);
LOG.info(String.format("Finished applying current ZK state in %dms. " +
"Number of partitions processed=%d, number of leading partitions=%d",
System.currentTimeMillis() - startTime, numPartitionsProcessed, currentLeadingPartitions.size()));
}

public void start() throws InterruptedException, KeeperException {
Expand All @@ -112,56 +117,56 @@ public void stop() throws InterruptedException {
heartbeat.stop();
}

private void processNodeDataChangedForPartitionState(String path, Set<TopicPartition> currentLeadingPartitions) {
private void processNodeDataChangedForPartitionState(String path, Set<TopicPartition> currentLeadingPartitions) throws InterruptedException, KeeperException {
LOG.debug("Processing NodeDataChangedForPartitionState " + path);
String data = null;
try {
String data = null;
try {
data = new String(zooKeeper.getData(path, true, stat));
} catch (KeeperException.NoNodeException e) {
LOG.warn("Caught exception trying to get zk data from path. Don't panic if zNode was deleted," +
" we will unwatch it." + path, e);
}
TopicPartition topicPartition = fromPath(path);
if (data != null) {
TopicPartitionState topicPartitionState = gson.fromJson(data, TopicPartitionState.class);
if (topicPartitionState.getLeader() == environmentProvider.brokerId()) {
LOG.info(String.format("Current leader of %s matches this broker ID: %s", path, topicPartitionState.getLeader()));
currentLeadingPartitions.add(topicPartition);
leadingPartitions.add(topicPartition);
directoryTreeWatcher.watch(topicPartition);
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).updateCounter(
topicPartition.topic(),
topicPartition.partition(),
UploaderMetrics.KAFKA_LEADER_COUNT_METRIC,
1,
"cluster=" + environmentProvider.clusterId(),
"broker=" + environmentProvider.brokerId()
);
} else if (leadingPartitions.contains(topicPartition)) {
// leadership change event
unwatchPartition(topicPartition);
}
} else if (leadingPartitions.contains(topicPartition)) {
// node deletion event
unwatchPartition(topicPartition);
}
} catch (InterruptedException | KeeperException e) {
LOG.error(String.format("Hit a ZK exception while extracting from %s. Will bounce the ZK client.", path), e);
try {
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter(
null,
null,
UploaderMetrics.WATCHER_ZK_RESET_METRIC,
data = new String(zooKeeper.getData(path, true, stat));
} catch (KeeperException.NoNodeException e) {
LOG.warn("Caught exception trying to get zk data from path. Don't panic if zNode was deleted," +
" we will unwatch it." + path, e);
}
TopicPartition topicPartition = fromPath(path);
if (data != null) {
TopicPartitionState topicPartitionState = gson.fromJson(data, TopicPartitionState.class);
if (topicPartitionState.getLeader() == environmentProvider.brokerId()) {
LOG.info(String.format("Current leader of %s matches this broker ID: %s", path, topicPartitionState.getLeader()));
currentLeadingPartitions.add(topicPartition);
leadingPartitions.add(topicPartition);
directoryTreeWatcher.watch(topicPartition);
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).updateCounter(
topicPartition.topic(),
topicPartition.partition(),
UploaderMetrics.KAFKA_LEADER_COUNT_METRIC,
1,
"cluster=" + environmentProvider.clusterId(),
"broker=" + environmentProvider.brokerId()
);
stop();
initialize(directoryTreeWatcher);
} catch (IOException | InterruptedException ex) {
LOG.error("Could not restore the ZK client.", ex);
throw new RuntimeException(ex);
} else if (leadingPartitions.contains(topicPartition)) {
// leadership change event
unwatchPartition(topicPartition);
}
} else if (leadingPartitions.contains(topicPartition)) {
// node deletion event
unwatchPartition(topicPartition);
}
}

private void tryResetZkClient(Exception e) {
LOG.error("Hit a ZK exception. Will bounce the ZK client.", e);
try {
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter(
null,
null,
UploaderMetrics.WATCHER_ZK_RESET_METRIC,
"cluster=" + environmentProvider.clusterId(),
"broker=" + environmentProvider.brokerId()
);
stop();
initialize(directoryTreeWatcher);
} catch (IOException | InterruptedException ex) {
LOG.error("Could not restore the ZK client.", ex);
throw new RuntimeException(ex);
}
}

Expand Down

0 comments on commit b5688d7

Please sign in to comment.