Skip to content

Commit

Permalink
KAFKA-17510: Exception handling and purgatory completion on initializ…
Browse files Browse the repository at this point in the history
…ation delay (apache#17709)

Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
apoorvmittal10 authored Nov 14, 2024
1 parent 6147a31 commit 1834030
Show file tree
Hide file tree
Showing 11 changed files with 658 additions and 290 deletions.
87 changes: 63 additions & 24 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class DelayedShareFetch extends DelayedOperation {

private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class);

private final ShareFetchData shareFetchData;
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;

private Map<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
Expand All @@ -66,12 +67,12 @@ public class DelayedShareFetch extends DelayedOperation {
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;

DelayedShareFetch(
ShareFetchData shareFetchData,
ShareFetch shareFetch,
ReplicaManager replicaManager,
SharePartitionManager sharePartitionManager,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
super(shareFetchData.fetchParams().maxWaitMs, Optional.empty());
this.shareFetchData = shareFetchData;
super(shareFetch.fetchParams().maxWaitMs, Optional.empty());
this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
this.partitionsAcquired = new LinkedHashMap<>();
this.partitionsAlreadyFetched = new LinkedHashMap<>();
Expand All @@ -91,10 +92,10 @@ public void onExpiration() {
@Override
public void onComplete() {
log.trace("Completing the delayed share fetch request for group {}, member {}, "
+ "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
partitionsAcquired.keySet());

if (shareFetchData.future().isDone())
if (shareFetch.isCompleted())
return;

Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
Expand All @@ -107,11 +108,11 @@ public void onComplete() {

if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetchData.future().complete(Collections.emptyMap());
shareFetch.maybeComplete(Collections.emptyMap());
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams());
topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams());

try {
Map<TopicIdPartition, LogReadResult> responseData;
Expand All @@ -126,11 +127,11 @@ public void onComplete() {
for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet())
fetchPartitionsData.put(entry.getKey(), entry.getValue().toFetchPartitionData(false));

shareFetchData.future().complete(ShareFetchUtils.processFetchResponse(shareFetchData, fetchPartitionsData,
shareFetch.maybeComplete(ShareFetchUtils.processFetchResponse(shareFetch, fetchPartitionsData,
sharePartitions, replicaManager));
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e);
handleFetchException(shareFetch, topicPartitionData.keySet(), e);
} finally {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(topicPartitionData.keySet());
Expand All @@ -140,7 +141,7 @@ public void onComplete() {
// we directly call delayedShareFetchPurgatory.checkAndComplete
replicaManager.addToActionQueue(() -> topicPartitionData.keySet().forEach(topicIdPartition ->
replicaManager.completeDelayedShareFetchRequest(
new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()))));
new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition()))));
}
}

Expand Down Expand Up @@ -170,13 +171,13 @@ public boolean tryComplete() {
return completedByMe;
} else {
log.debug("minBytes is not satisfied for the share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
"topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
sharePartitions.keySet());
releasePartitionLocks(topicPartitionData.keySet());
}
} else {
log.trace("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
"topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
sharePartitions.keySet());
}
return false;
Expand All @@ -198,7 +199,7 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();

sharePartitions.forEach((topicIdPartition, sharePartition) -> {
int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
// Add the share partition to the list of partitions to be fetched only if we can
// acquire the fetch lock on it.
if (sharePartition.maybeAcquireFetchLock()) {
Expand Down Expand Up @@ -266,7 +267,16 @@ private boolean isMinBytesSatisfied(Map<TopicIdPartition, FetchRequest.Partition
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : topicPartitionData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
FetchRequest.PartitionData partitionData = entry.getValue();
LogOffsetMetadata endOffsetMetadata = endOffsetMetadataForTopicPartition(topicIdPartition);

LogOffsetMetadata endOffsetMetadata;
try {
endOffsetMetadata = endOffsetMetadataForTopicPartition(topicIdPartition);
} catch (Exception e) {
shareFetch.addErroneous(topicIdPartition, e);
sharePartitionManager.handleFencedSharePartitionException(
new SharePartitionKey(shareFetch.groupId(), topicIdPartition), e);
continue;
}

if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
continue;
Expand All @@ -280,14 +290,14 @@ private boolean isMinBytesSatisfied(Map<TopicIdPartition, FetchRequest.Partition

if (fetchOffsetMetadata.messageOffset > endOffsetMetadata.messageOffset) {
log.debug("Satisfying delayed share fetch request for group {}, member {} since it is fetching later segments of " +
"topicIdPartition {}", shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition);
"topicIdPartition {}", shareFetch.groupId(), shareFetch.memberId(), topicIdPartition);
return true;
} else if (fetchOffsetMetadata.messageOffset < endOffsetMetadata.messageOffset) {
if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
// This can happen when the fetch operation is falling behind the current segment or the partition
// has just rolled a new segment.
log.debug("Satisfying delayed share fetch request for group {}, member {} immediately since it is fetching older " +
"segments of topicIdPartition {}", shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition);
"segments of topicIdPartition {}", shareFetch.groupId(), shareFetch.memberId(), topicIdPartition);
return true;
} else if (fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
// we take the partition fetch size as upper bound when accumulating the bytes.
Expand All @@ -296,15 +306,15 @@ private boolean isMinBytesSatisfied(Map<TopicIdPartition, FetchRequest.Partition
}
}
}
return accumulatedSize >= shareFetchData.fetchParams().minBytes;
return accumulatedSize >= shareFetch.fetchParams().minBytes;
}

private LogOffsetMetadata endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
Partition partition = ShareFetchUtils.partition(replicaManager, topicIdPartition.topicPartition());
LogOffsetSnapshot offsetSnapshot = partition.fetchOffsetSnapshot(Optional.empty(), true);
// The FetchIsolation type that we use for share fetch is FetchIsolation.HIGH_WATERMARK. In the future, we can
// extend it to support other FetchIsolation types.
FetchIsolation isolationType = shareFetchData.fetchParams().isolation;
FetchIsolation isolationType = shareFetch.fetchParams().isolation;
if (isolationType == FetchIsolation.LOG_END)
return offsetSnapshot.logEndOffset;
else if (isolationType == FetchIsolation.HIGH_WATERMARK)
Expand All @@ -315,11 +325,17 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)
}

private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
// Filter if there already exists any erroneous topic partition.
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
if (partitionsToFetch.isEmpty()) {
return Collections.emptyMap();
}

Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
shareFetchData.fetchParams(),
shareFetch.fetchParams(),
CollectionConverters.asScala(
topicPartitionData.entrySet().stream().map(entry ->
new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList())
partitionsToFetch.stream().map(topicIdPartition ->
new Tuple2<>(topicIdPartition, topicPartitionData.get(topicIdPartition))).collect(Collectors.toList())
),
QuotaFactory.UNBOUNDED_QUOTA,
true);
Expand All @@ -339,6 +355,29 @@ private boolean anyPartitionHasLogReadError(Map<TopicIdPartition, LogReadResult>
.anyMatch(logReadResult -> logReadResult.error().code() != Errors.NONE.code());
}

/**
* The handleFetchException method is used to handle the exception that occurred while reading from log.
* The method will handle the exception for each topic-partition in the request. The share partition
* might get removed from the cache.
* <p>
* The replica read request might error out for one share partition
* but as we cannot determine which share partition errored out, we might remove all the share partitions
* in the request.
*
* @param shareFetch The share fetch request.
* @param topicIdPartitions The topic-partitions in the replica read request.
* @param throwable The exception that occurred while fetching messages.
*/
private void handleFetchException(
ShareFetch shareFetch,
Set<TopicIdPartition> topicIdPartitions,
Throwable throwable
) {
topicIdPartitions.forEach(topicIdPartition -> sharePartitionManager.handleFencedSharePartitionException(
new SharePartitionKey(shareFetch.groupId(), topicIdPartition), throwable));
shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
}

// Visible for testing.
Map<TopicIdPartition, LogReadResult> combineLogReadResponse(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
Map<TopicIdPartition, LogReadResult> existingFetchedData) {
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetchData;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchPartitionData;

import org.slf4j.Logger;
Expand All @@ -55,7 +55,7 @@ public class ShareFetchUtils {
* by acquiring records from the share partition.
*/
static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchResponse(
ShareFetchData shareFetchData,
ShareFetch shareFetch,
Map<TopicIdPartition, FetchPartitionData> responseData,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
ReplicaManager replicaManager
Expand Down Expand Up @@ -91,7 +91,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
partitionData.setErrorMessage(Errors.NONE.message());
}
} else {
ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetchData.memberId(), shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetch.memberId(), shareFetch.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
log.trace("Acquired records: {} for topicIdPartition: {}", shareAcquiredRecords, topicIdPartition);
// Maybe, in the future, check if no records are acquired, and we want to retry
// replica manager fetch. Depends on the share partition manager implementation,
Expand Down Expand Up @@ -151,11 +151,15 @@ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaM
}

static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
return partition(replicaManager, tp).getLeaderEpoch();
}

static Partition partition(ReplicaManager replicaManager, TopicPartition tp) {
Partition partition = replicaManager.getPartitionOrException(tp);
if (!partition.isLeader()) {
log.debug("The broker is not the leader for topic partition: {}-{}", tp.topic(), tp.partition());
throw new NotLeaderOrFollowerException();
}
return partition.getLeaderEpoch();
return partition;
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -1082,8 +1082,8 @@ boolean canAcquireRecords() {

/**
* Prior to fetching records from the leader, the fetch lock is acquired to ensure that the same
* share partition does not enter a fetch queue while another one is being fetched within the queue.
* The fetch lock is released once the records are fetched from the leader.
* share partition is not fetched concurrently by multiple clients. The fetch lock is released once
* the records are fetched and acquired.
*
* @return A boolean which indicates whether the fetch lock is acquired.
*/
Expand Down
Loading

0 comments on commit 1834030

Please sign in to comment.