From 1834030107b93fe2c08fcb48451f14b0d764051d Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Thu, 14 Nov 2024 15:44:42 +0000 Subject: [PATCH] KAFKA-17510: Exception handling and purgatory completion on initialization delay (#17709) Reviewers: Jun Rao --- .../kafka/server/share/DelayedShareFetch.java | 87 +++++--- .../kafka/server/share/ShareFetchUtils.java | 12 +- .../kafka/server/share/SharePartition.java | 4 +- .../server/share/SharePartitionManager.java | 136 +++++------- .../server/share/DelayedShareFetchTest.java | 107 ++++++---- .../server/share/ShareFetchUtilsTest.java | 26 +-- .../share/SharePartitionManagerTest.java | 128 +++++++++-- .../ShareFetchAcknowledgeRequestTest.scala | 78 +++++-- .../kafka/server/share/fetch/ShareFetch.java | 202 ++++++++++++++++++ .../server/share/fetch/ShareFetchData.java | 78 ------- .../server/share/fetch/ShareFetchTest.java | 90 ++++++++ 11 files changed, 658 insertions(+), 290 deletions(-) create mode 100644 share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java delete mode 100644 share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java create mode 100644 share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 4cb9ce0cf4241..a5cd79ee35897 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -25,8 +25,9 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.server.purgatory.DelayedOperation; +import org.apache.kafka.server.share.SharePartitionKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; -import org.apache.kafka.server.share.fetch.ShareFetchData; +import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; @@ -55,7 +56,7 @@ public class DelayedShareFetch extends DelayedOperation { private static final Logger log = LoggerFactory.getLogger(DelayedShareFetch.class); - private final ShareFetchData shareFetchData; + private final ShareFetch shareFetch; private final ReplicaManager replicaManager; private Map partitionsAcquired; @@ -66,12 +67,12 @@ public class DelayedShareFetch extends DelayedOperation { private final LinkedHashMap sharePartitions; DelayedShareFetch( - ShareFetchData shareFetchData, + ShareFetch shareFetch, ReplicaManager replicaManager, SharePartitionManager sharePartitionManager, LinkedHashMap sharePartitions) { - super(shareFetchData.fetchParams().maxWaitMs, Optional.empty()); - this.shareFetchData = shareFetchData; + super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); + this.shareFetch = shareFetch; this.replicaManager = replicaManager; this.partitionsAcquired = new LinkedHashMap<>(); this.partitionsAlreadyFetched = new LinkedHashMap<>(); @@ -91,10 +92,10 @@ public void onExpiration() { @Override public void onComplete() { log.trace("Completing the delayed share fetch request for group {}, member {}, " - + "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(), + + "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), partitionsAcquired.keySet()); - if (shareFetchData.future().isDone()) + if (shareFetch.isCompleted()) return; Map topicPartitionData; @@ -107,11 +108,11 @@ public void onComplete() { if (topicPartitionData.isEmpty()) { // No locks for share partitions could be acquired, so we complete the request with an empty response. - shareFetchData.future().complete(Collections.emptyMap()); + shareFetch.maybeComplete(Collections.emptyMap()); return; } log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", - topicPartitionData, shareFetchData.groupId(), shareFetchData.fetchParams()); + topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); try { Map responseData; @@ -126,11 +127,11 @@ public void onComplete() { for (Map.Entry entry : responseData.entrySet()) fetchPartitionsData.put(entry.getKey(), entry.getValue().toFetchPartitionData(false)); - shareFetchData.future().complete(ShareFetchUtils.processFetchResponse(shareFetchData, fetchPartitionsData, + shareFetch.maybeComplete(ShareFetchUtils.processFetchResponse(shareFetch, fetchPartitionsData, sharePartitions, replicaManager)); } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e); + handleFetchException(shareFetch, topicPartitionData.keySet(), e); } finally { // Releasing the lock to move ahead with the next request in queue. releasePartitionLocks(topicPartitionData.keySet()); @@ -140,7 +141,7 @@ public void onComplete() { // we directly call delayedShareFetchPurgatory.checkAndComplete replicaManager.addToActionQueue(() -> topicPartitionData.keySet().forEach(topicIdPartition -> replicaManager.completeDelayedShareFetchRequest( - new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); } } @@ -170,13 +171,13 @@ public boolean tryComplete() { return completedByMe; } else { log.debug("minBytes is not satisfied for the share fetch request for group {}, member {}, " + - "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(), + "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), sharePartitions.keySet()); releasePartitionLocks(topicPartitionData.keySet()); } } else { log.trace("Can't acquire records for any partition in the share fetch request for group {}, member {}, " + - "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(), + "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), sharePartitions.keySet()); } return false; @@ -198,7 +199,7 @@ Map acquirablePartitions() { Map topicPartitionData = new LinkedHashMap<>(); sharePartitions.forEach((topicIdPartition, sharePartition) -> { - int partitionMaxBytes = shareFetchData.partitionMaxBytes().getOrDefault(topicIdPartition, 0); + int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0); // Add the share partition to the list of partitions to be fetched only if we can // acquire the fetch lock on it. if (sharePartition.maybeAcquireFetchLock()) { @@ -266,7 +267,16 @@ private boolean isMinBytesSatisfied(Map entry : topicPartitionData.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); FetchRequest.PartitionData partitionData = entry.getValue(); - LogOffsetMetadata endOffsetMetadata = endOffsetMetadataForTopicPartition(topicIdPartition); + + LogOffsetMetadata endOffsetMetadata; + try { + endOffsetMetadata = endOffsetMetadataForTopicPartition(topicIdPartition); + } catch (Exception e) { + shareFetch.addErroneous(topicIdPartition, e); + sharePartitionManager.handleFencedSharePartitionException( + new SharePartitionKey(shareFetch.groupId(), topicIdPartition), e); + continue; + } if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) continue; @@ -280,14 +290,14 @@ private boolean isMinBytesSatisfied(Map endOffsetMetadata.messageOffset) { log.debug("Satisfying delayed share fetch request for group {}, member {} since it is fetching later segments of " + - "topicIdPartition {}", shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition); + "topicIdPartition {}", shareFetch.groupId(), shareFetch.memberId(), topicIdPartition); return true; } else if (fetchOffsetMetadata.messageOffset < endOffsetMetadata.messageOffset) { if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) { // This can happen when the fetch operation is falling behind the current segment or the partition // has just rolled a new segment. log.debug("Satisfying delayed share fetch request for group {}, member {} immediately since it is fetching older " + - "segments of topicIdPartition {}", shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition); + "segments of topicIdPartition {}", shareFetch.groupId(), shareFetch.memberId(), topicIdPartition); return true; } else if (fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) { // we take the partition fetch size as upper bound when accumulating the bytes. @@ -296,15 +306,15 @@ private boolean isMinBytesSatisfied(Map= shareFetchData.fetchParams().minBytes; + return accumulatedSize >= shareFetch.fetchParams().minBytes; } private LogOffsetMetadata endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) { - Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + Partition partition = ShareFetchUtils.partition(replicaManager, topicIdPartition.topicPartition()); LogOffsetSnapshot offsetSnapshot = partition.fetchOffsetSnapshot(Optional.empty(), true); // The FetchIsolation type that we use for share fetch is FetchIsolation.HIGH_WATERMARK. In the future, we can // extend it to support other FetchIsolation types. - FetchIsolation isolationType = shareFetchData.fetchParams().isolation; + FetchIsolation isolationType = shareFetch.fetchParams().isolation; if (isolationType == FetchIsolation.LOG_END) return offsetSnapshot.logEndOffset; else if (isolationType == FetchIsolation.HIGH_WATERMARK) @@ -315,11 +325,17 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK) } private Map readFromLog(Map topicPartitionData) { + // Filter if there already exists any erroneous topic partition. + Set partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet()); + if (partitionsToFetch.isEmpty()) { + return Collections.emptyMap(); + } + Seq> responseLogResult = replicaManager.readFromLog( - shareFetchData.fetchParams(), + shareFetch.fetchParams(), CollectionConverters.asScala( - topicPartitionData.entrySet().stream().map(entry -> - new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList()) + partitionsToFetch.stream().map(topicIdPartition -> + new Tuple2<>(topicIdPartition, topicPartitionData.get(topicIdPartition))).collect(Collectors.toList()) ), QuotaFactory.UNBOUNDED_QUOTA, true); @@ -339,6 +355,29 @@ private boolean anyPartitionHasLogReadError(Map .anyMatch(logReadResult -> logReadResult.error().code() != Errors.NONE.code()); } + /** + * The handleFetchException method is used to handle the exception that occurred while reading from log. + * The method will handle the exception for each topic-partition in the request. The share partition + * might get removed from the cache. + *

+ * The replica read request might error out for one share partition + * but as we cannot determine which share partition errored out, we might remove all the share partitions + * in the request. + * + * @param shareFetch The share fetch request. + * @param topicIdPartitions The topic-partitions in the replica read request. + * @param throwable The exception that occurred while fetching messages. + */ + private void handleFetchException( + ShareFetch shareFetch, + Set topicIdPartitions, + Throwable throwable + ) { + topicIdPartitions.forEach(topicIdPartition -> sharePartitionManager.handleFencedSharePartitionException( + new SharePartitionKey(shareFetch.groupId(), topicIdPartition), throwable)); + shareFetch.maybeCompleteWithException(topicIdPartitions, throwable); + } + // Visible for testing. Map combineLogReadResponse(Map topicPartitionData, Map existingFetchedData) { diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java index 3515362152b02..e3608128eb5fe 100644 --- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java +++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java @@ -29,7 +29,7 @@ import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; -import org.apache.kafka.server.share.fetch.ShareFetchData; +import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.storage.log.FetchPartitionData; import org.slf4j.Logger; @@ -55,7 +55,7 @@ public class ShareFetchUtils { * by acquiring records from the share partition. */ static Map processFetchResponse( - ShareFetchData shareFetchData, + ShareFetch shareFetch, Map responseData, LinkedHashMap sharePartitions, ReplicaManager replicaManager @@ -91,7 +91,7 @@ static Map processFetchR partitionData.setErrorMessage(Errors.NONE.message()); } } else { - ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetchData.memberId(), shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData); + ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetch.memberId(), shareFetch.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData); log.trace("Acquired records: {} for topicIdPartition: {}", shareAcquiredRecords, topicIdPartition); // Maybe, in the future, check if no records are acquired, and we want to retry // replica manager fetch. Depends on the share partition manager implementation, @@ -151,11 +151,15 @@ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaM } static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) { + return partition(replicaManager, tp).getLeaderEpoch(); + } + + static Partition partition(ReplicaManager replicaManager, TopicPartition tp) { Partition partition = replicaManager.getPartitionOrException(tp); if (!partition.isLeader()) { log.debug("The broker is not the leader for topic partition: {}-{}", tp.topic(), tp.partition()); throw new NotLeaderOrFollowerException(); } - return partition.getLeaderEpoch(); + return partition; } } diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 71baea1017441..632cb1e316919 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -1082,8 +1082,8 @@ boolean canAcquireRecords() { /** * Prior to fetching records from the leader, the fetch lock is acquired to ensure that the same - * share partition does not enter a fetch queue while another one is being fetched within the queue. - * The fetch lock is released once the records are fetched from the leader. + * share partition is not fetched concurrently by multiple clients. The fetch lock is released once + * the records are fetched and acquired. * * @return A boolean which indicates whether the fetch lock is acquired. */ diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 4288dd55703d7..1c6c5492372ff 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -49,7 +49,7 @@ import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey; -import org.apache.kafka.server.share.fetch.ShareFetchData; +import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.share.persister.Persister; import org.apache.kafka.server.share.session.ShareSession; import org.apache.kafka.server.share.session.ShareSessionCache; @@ -71,10 +71,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; /** * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. @@ -250,7 +248,7 @@ public CompletableFuture> fetchMessages( partitionMaxBytes.keySet(), groupId, fetchParams); CompletableFuture> future = new CompletableFuture<>(); - processShareFetch(new ShareFetchData(fetchParams, groupId, memberId, future, partitionMaxBytes, maxFetchRecords)); + processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, maxFetchRecords)); return future; } @@ -498,30 +496,6 @@ public void acknowledgeSessionUpdate(String groupId, ShareRequestMetadata reqMet } } - /** - * The handleFetchException method is used to handle the exception that occurred while reading from log. - * The method will handle the exception for each topic-partition in the request. The share partition - * might get removed from the cache. - *

- * The replica read request might error out for one share partition - * but as we cannot determine which share partition errored out, we might remove all the share partitions - * in the request. - * - * @param groupId The group id in the share fetch request. - * @param topicIdPartitions The topic-partitions in the replica read request. - * @param future The future to complete with the exception. - * @param throwable The exception that occurred while fetching messages. - */ - public void handleFetchException( - String groupId, - Set topicIdPartitions, - CompletableFuture> future, - Throwable throwable - ) { - topicIdPartitions.forEach(topicIdPartition -> handleFencedSharePartitionException(sharePartitionKey(groupId, topicIdPartition), throwable)); - maybeCompleteShareFetchWithException(future, topicIdPartitions, throwable); - } - /** * The cachedTopicIdPartitionsInShareSession method is used to get the cached topic-partitions in the share session. * @@ -564,20 +538,18 @@ private static String partitionsToLogString(Collection partiti } // Visible for testing. - void processShareFetch(ShareFetchData shareFetchData) { - if (shareFetchData.partitionMaxBytes().isEmpty()) { + void processShareFetch(ShareFetch shareFetch) { + if (shareFetch.partitionMaxBytes().isEmpty()) { // If there are no partitions to fetch then complete the future with an empty map. - shareFetchData.future().complete(Collections.emptyMap()); + shareFetch.maybeComplete(Collections.emptyMap()); return; } - // Initialize lazily, if required. - Map erroneous = null; List delayedShareFetchWatchKeys = new ArrayList<>(); LinkedHashMap sharePartitions = new LinkedHashMap<>(); - for (TopicIdPartition topicIdPartition : shareFetchData.partitionMaxBytes().keySet()) { + for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) { SharePartitionKey sharePartitionKey = sharePartitionKey( - shareFetchData.groupId(), + shareFetch.groupId(), topicIdPartition ); @@ -585,15 +557,8 @@ void processShareFetch(ShareFetchData shareFetchData) { try { sharePartition = getOrCreateSharePartition(sharePartitionKey); } catch (Exception e) { - // Complete the whole fetch request with an exception if there is an error processing. - // The exception currently can be thrown only if there is an error while initializing - // the share partition. But skip the processing for other share partitions in the request - // as this situation is not expected. - log.error("Error processing share fetch request", e); - if (erroneous == null) { - erroneous = new HashMap<>(); - } - erroneous.put(topicIdPartition, e); + log.debug("Error processing share fetch request", e); + shareFetch.addErroneous(topicIdPartition, e); // Continue iteration for other partitions in the request. continue; } @@ -601,37 +566,42 @@ void processShareFetch(ShareFetchData shareFetchData) { // We add a key corresponding to each share partition in the request in the group so that when there are // acknowledgements/acquisition lock timeout etc., we have a way to perform checkAndComplete for all // such requests which are delayed because of lack of data to acquire for the share partition. - delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(shareFetchData.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())); + DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(shareFetch.groupId(), + topicIdPartition.topicId(), topicIdPartition.partition()); + delayedShareFetchWatchKeys.add(delayedShareFetchKey); // We add a key corresponding to each topic partition in the request so that when the HWM is updated // for any topic partition, we have a way to perform checkAndComplete for all such requests which are // delayed because of lack of data to acquire for the topic partition. delayedShareFetchWatchKeys.add(new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition())); - // The share partition is initialized asynchronously, so we need to wait for it to be initialized. - // But if the share partition is already initialized, then the future will be completed immediately. - // Hence, it's safe to call the maybeInitialize method and then wait for the future to be completed. - // TopicPartitionData list will be populated only if the share partition is already initialized. - sharePartition.maybeInitialize().whenComplete((result, throwable) -> { + + CompletableFuture initializationFuture = sharePartition.maybeInitialize(); + final boolean initialized = initializationFuture.isDone(); + initializationFuture.whenComplete((result, throwable) -> { if (throwable != null) { - // TODO: Complete error handling for initialization. We have to record the error - // for respective share partition as completing the full request might result in - // some acquired records to not being sent: https://issues.apache.org/jira/browse/KAFKA-17510 - maybeCompleteInitializationWithException(sharePartitionKey, shareFetchData.future(), throwable); + handleInitializationException(sharePartitionKey, shareFetch, throwable); } + // Though the share partition is initialized asynchronously, but if already initialized or + // errored then future should be completed immediately. If the initialization is not completed + // immediately then the requests might be waiting in purgatory until the share partition + // is initialized. Hence, trigger the completion of all pending delayed share fetch requests + // for the share partition. + if (!initialized) + replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey); }); sharePartitions.put(topicIdPartition, sharePartition); } // If all the partitions in the request errored out, then complete the fetch request with an exception. - if (erroneous != null && erroneous.size() == shareFetchData.partitionMaxBytes().size()) { - completeShareFetchWithException(shareFetchData.future(), erroneous); + if (shareFetch.errorInAllPartitions()) { + shareFetch.maybeComplete(Collections.emptyMap()); // Do not proceed with share fetch processing as all the partitions errored out. return; } - // TODO: If there exists some erroneous partitions then they will not be part of response. - // Add the share fetch to the delayed share fetch purgatory to process the fetch request. - addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, this, sharePartitions), delayedShareFetchWatchKeys); + // The request will be added irrespective of whether the share partition is initialized or not. + // Once the share partition is initialized, the delayed share fetch will be completed. + addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, this, sharePartitions), delayedShareFetchWatchKeys); } private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) { @@ -657,28 +627,35 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio }); } - private void maybeCompleteInitializationWithException( + private void handleInitializationException( SharePartitionKey sharePartitionKey, - CompletableFuture> future, + ShareFetch shareFetch, Throwable throwable) { if (throwable instanceof LeaderNotAvailableException) { log.debug("The share partition with key {} is not initialized yet", sharePartitionKey); - // Do not process the fetch request for this partition as the leader is not initialized yet. - // The fetch request will be retried in the next poll. - // TODO: Add the request to delayed fetch purgatory. + // Skip any handling for this error as the share partition is still loading. The request + // to fetch will be added in purgatory and will be completed once either timed out + // or the share partition initialization completes. return; } // Remove the partition from the cache as it's failed to initialize. - partitionCacheMap.remove(sharePartitionKey); - // The partition initialization failed, so complete the request with the exception. - // The server should not be in this state, so log the error on broker and surface the same - // to the client. The broker should not be in this state, investigate the root cause of the error. - log.error("Error initializing share partition with key {}", sharePartitionKey, throwable); - maybeCompleteShareFetchWithException(future, Collections.singletonList(sharePartitionKey.topicIdPartition()), throwable); + SharePartition sharePartition = partitionCacheMap.remove(sharePartitionKey); + if (sharePartition != null) { + sharePartition.markFenced(); + } + // The partition initialization failed, so add the partition to the erroneous partitions. + log.debug("Error initializing share partition with key {}", sharePartitionKey, throwable); + shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), throwable); } - private void handleFencedSharePartitionException( + /** + * The method is used to handle the share partition exception. + * + * @param sharePartitionKey The share partition key. + * @param throwable The exception. + */ + public void handleFencedSharePartitionException( SharePartitionKey sharePartitionKey, Throwable throwable ) { @@ -695,23 +672,6 @@ private void handleFencedSharePartitionException( } } - private void maybeCompleteShareFetchWithException(CompletableFuture> future, - Collection topicIdPartitions, Throwable throwable) { - if (!future.isDone()) { - future.complete(topicIdPartitions.stream().collect(Collectors.toMap( - tp -> tp, tp -> new PartitionData().setErrorCode(Errors.forException(throwable).code()).setErrorMessage(throwable.getMessage())))); - } - } - - private void completeShareFetchWithException(CompletableFuture> future, - Map erroneous) { - future.complete(erroneous.entrySet().stream().collect(Collectors.toMap( - Map.Entry::getKey, entry -> { - Throwable t = entry.getValue(); - return new PartitionData().setErrorCode(Errors.forException(t).code()).setErrorMessage(t.getMessage()); - }))); - } - private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) { return new SharePartitionKey(groupId, topicIdPartition); } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 0c7b488f18020..f1f708f9dd99f 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; -import org.apache.kafka.server.share.fetch.ShareFetchData; +import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; @@ -113,13 +113,13 @@ public void testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartit sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); when(sp0.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .build()); @@ -150,7 +150,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); @@ -172,7 +172,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .build()); @@ -205,7 +205,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); @@ -223,7 +223,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() { mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .build()); @@ -256,7 +256,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); when(sp0.canAcquireRecords()).thenReturn(true); @@ -268,7 +268,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) .build()); @@ -301,13 +301,14 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), - new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); + CompletableFuture> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + future, partitionMaxBytes, MAX_FETCH_RECORDS); when(sp0.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) .build()); @@ -315,7 +316,7 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() { delayedShareFetch.forceComplete(); // Since no partition could be acquired, the future should be empty and replicaManager.readFromLog should not be called. - assertEquals(0, shareFetchData.future().join().size()); + assertEquals(0, future.join().size()); Mockito.verify(replicaManager, times(0)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); assertTrue(delayedShareFetch.isCompleted()); @@ -343,7 +344,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); when(sp0.canAcquireRecords()).thenReturn(true); @@ -352,7 +353,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) .build()); @@ -365,7 +366,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { Mockito.verify(sp0, times(1)).nextFetchOffset(); Mockito.verify(sp1, times(0)).nextFetchOffset(); assertTrue(delayedShareFetch.isCompleted()); - assertTrue(shareFetchData.future().isDone()); + assertTrue(shareFetch.isCompleted()); Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); } @@ -384,14 +385,14 @@ public void testToCompleteAnAlreadyCompletedFuture() { sharePartitions.put(tp0, sp0); CompletableFuture> future = new CompletableFuture<>(); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), future, partitionMaxBytes, MAX_FETCH_RECORDS); when(sp0.maybeAcquireFetchLock()).thenReturn(true); when(sp0.canAcquireRecords()).thenReturn(false); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) .build()); @@ -402,7 +403,7 @@ public void testToCompleteAnAlreadyCompletedFuture() { assertTrue(delayedShareFetch.isCompleted()); // Verifying that the first forceComplete calls acquirablePartitions method in DelayedShareFetch. Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); - assertEquals(0, shareFetchData.future().join().size()); + assertEquals(0, future.join().size()); // Force completing the share fetch request for the second time should hit the future completion check and not // proceed ahead in the function. @@ -438,7 +439,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { sharePartitions1.put(tp1, sp1); sharePartitions1.put(tp2, sp2); - ShareFetchData shareFetchData1 = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes1, MAX_FETCH_RECORDS); DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( @@ -450,7 +451,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData1) + .withShareFetchData(shareFetch1) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions1) .build(); @@ -460,12 +461,12 @@ public void testForceCompleteTriggersDelayedActionsQueue() { delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, delayedShareFetchWatchKeys); assertEquals(2, delayedShareFetchPurgatory.watched()); - assertFalse(shareFetchData1.future().isDone()); + assertFalse(shareFetch1.isCompleted()); Map partitionMaxBytes2 = new HashMap<>(); partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES); partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES); - ShareFetchData shareFetchData2 = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes2, MAX_FETCH_RECORDS); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); @@ -476,7 +477,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { sharePartitions2.put(tp2, sp2); DelayedShareFetch delayedShareFetch2 = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData2) + .withShareFetchData(shareFetch2) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions2) .build()); @@ -491,7 +492,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() { // requests, it should add a "check and complete" action for request key tp1 on the purgatory. delayedShareFetch2.forceComplete(); assertTrue(delayedShareFetch2.isCompleted()); - assertTrue(shareFetchData2.future().isDone()); + assertTrue(shareFetch2.isCompleted()); Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); assertFalse(delayedShareFetch1.isCompleted()); @@ -518,13 +519,13 @@ public void testCombineLogReadResponse() { sharePartitions.put(tp1, sp1); CompletableFuture> future = new CompletableFuture<>(); - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), future, partitionMaxBytes, MAX_FETCH_RECORDS); DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withReplicaManager(replicaManager) .withSharePartitions(sharePartitions) .build(); @@ -568,7 +569,7 @@ public void testExceptionInMinBytesCalculation() { LinkedHashMap sharePartitions = new LinkedHashMap<>(); sharePartitions.put(tp0, sp0); - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS); @@ -583,20 +584,35 @@ public void testExceptionInMinBytesCalculation() { when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(partition); when(partition.fetchOffsetSnapshot(any(), anyBoolean())).thenThrow(new RuntimeException("Exception thrown")); + SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withReplicaManager(replicaManager) + .withSharePartitionManager(sharePartitionManager) .build()); + + // Try complete should return false as the share partition has errored out. + assertFalse(delayedShareFetch.tryComplete()); + // Fetch should remain pending and should be completed on request timeout. assertFalse(delayedShareFetch.isCompleted()); + // The request should be errored out as topic partition should get added as erroneous. + assertTrue(shareFetch.errorInAllPartitions()); - // Since minBytes calculation throws an exception and returns true, tryComplete should return true. - assertTrue(delayedShareFetch.tryComplete()); + Mockito.verify(sharePartitionManager, times(1)).handleFencedSharePartitionException(any(), any()); + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); + + // Force complete the request as it's still pending. Return false from the share partition lock acquire. + when(sp0.maybeAcquireFetchLock()).thenReturn(false); + assertTrue(delayedShareFetch.forceComplete()); assertTrue(delayedShareFetch.isCompleted()); - Mockito.verify(replicaManager, times(2)).readFromLog( + + // Read from log and release partition locks should not be called as the request is errored out. + Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); - // releasePartitionLocks will be called twice, once from tryComplete and then from onComplete. - Mockito.verify(delayedShareFetch, times(2)).releasePartitionLocks(any()); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(any()); } @Test @@ -615,11 +631,11 @@ public void testLocksReleasedForCompletedFetch() { doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), MAX_FETCH_RECORDS); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions1) .withReplicaManager(replicaManager) .build(); @@ -643,11 +659,11 @@ public void testLocksReleasedAcquireException() { LinkedHashMap sharePartitions = new LinkedHashMap<>(); sharePartitions.put(tp0, sp0); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), MAX_FETCH_RECORDS); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .build(); @@ -675,13 +691,13 @@ private void mockTopicIdPartitionFetchBytes(ReplicaManager replicaManager, Topic } static class DelayedShareFetchBuilder { - ShareFetchData shareFetchData = mock(ShareFetchData.class); + ShareFetch shareFetch = mock(ShareFetch.class); private ReplicaManager replicaManager = mock(ReplicaManager.class); - private final SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); + private SharePartitionManager sharePartitionManager = mock(SharePartitionManager.class); private LinkedHashMap sharePartitions = mock(LinkedHashMap.class); - DelayedShareFetchBuilder withShareFetchData(ShareFetchData shareFetchData) { - this.shareFetchData = shareFetchData; + DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) { + this.shareFetch = shareFetch; return this; } @@ -690,6 +706,11 @@ DelayedShareFetchBuilder withReplicaManager(ReplicaManager replicaManager) { return this; } + DelayedShareFetchBuilder withSharePartitionManager(SharePartitionManager sharePartitionManager) { + this.sharePartitionManager = sharePartitionManager; + return this; + } + DelayedShareFetchBuilder withSharePartitions(LinkedHashMap sharePartitions) { this.sharePartitions = sharePartitions; return this; @@ -701,7 +722,7 @@ public static DelayedShareFetchBuilder builder() { public DelayedShareFetch build() { return new DelayedShareFetch( - shareFetchData, + shareFetch, replicaManager, sharePartitionManager, sharePartitions); diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index 6ff0f90bc49d9..f8a53e9aa3382 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; -import org.apache.kafka.server.share.fetch.ShareFetchData; +import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchParams; import org.apache.kafka.server.storage.log.FetchPartitionData; @@ -101,7 +101,7 @@ public void testProcessFetchResponse() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, memberId, + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes, 100); MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, @@ -124,7 +124,7 @@ public void testProcessFetchResponse() { records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); Map resultData = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitions, mock(ReplicaManager.class)); + ShareFetchUtils.processFetchResponse(shareFetch, responseData, sharePartitions, mock(ReplicaManager.class)); assertEquals(2, resultData.size()); assertTrue(resultData.containsKey(tp0)); @@ -167,7 +167,7 @@ public void testProcessFetchResponseWithEmptyRecords() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, memberId, + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes, 100); Map responseData = new HashMap<>(); @@ -178,7 +178,7 @@ public void testProcessFetchResponseWithEmptyRecords() { MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); Map resultData = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitions, mock(ReplicaManager.class)); + ShareFetchUtils.processFetchResponse(shareFetch, responseData, sharePartitions, mock(ReplicaManager.class)); assertEquals(2, resultData.size()); assertTrue(resultData.containsKey(tp0)); @@ -209,7 +209,7 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() { sharePartitions.put(tp0, sp0); sharePartitions.put(tp1, sp1); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, 100); ReplicaManager replicaManager = mock(ReplicaManager.class); @@ -247,7 +247,7 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() { records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); Map resultData1 = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, sharePartitions, replicaManager); + ShareFetchUtils.processFetchResponse(shareFetch, responseData1, sharePartitions, replicaManager); assertEquals(2, resultData1.size()); assertTrue(resultData1.containsKey(tp0)); @@ -276,7 +276,7 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() { MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); Map resultData2 = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData2, sharePartitions, replicaManager); + ShareFetchUtils.processFetchResponse(shareFetch, responseData2, sharePartitions, replicaManager); assertEquals(2, resultData2.size()); assertTrue(resultData2.containsKey(tp0)); @@ -303,7 +303,7 @@ public void testProcessFetchResponseWhenNoRecordsAreAcquired() { LinkedHashMap sharePartitions = new LinkedHashMap<>(); sharePartitions.put(tp0, sp0); - ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture<>(), partitionMaxBytes, 100); ReplicaManager replicaManager = mock(ReplicaManager.class); @@ -327,7 +327,7 @@ tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, OptionalInt.empty(), false)); Map resultData = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitions, replicaManager); + ShareFetchUtils.processFetchResponse(shareFetch, responseData, sharePartitions, replicaManager); assertEquals(1, resultData.size()); assertTrue(resultData.containsKey(tp0)); @@ -342,7 +342,7 @@ tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L, records, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); - resultData = ShareFetchUtils.processFetchResponse(shareFetchData, responseData, sharePartitions, replicaManager); + resultData = ShareFetchUtils.processFetchResponse(shareFetch, responseData, sharePartitions, replicaManager); assertEquals(1, resultData.size()); assertTrue(resultData.containsKey(tp0)); @@ -376,7 +376,7 @@ public void testProcessFetchResponseWithMaxFetchRecords() { Uuid memberId = Uuid.randomUuid(); // Set max fetch records to 10 - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId.toString(), new CompletableFuture<>(), partitionMaxBytes, 10); @@ -413,7 +413,7 @@ public void testProcessFetchResponseWithMaxFetchRecords() { responseData1.put(tp1, fetchPartitionData2); Map resultData1 = - ShareFetchUtils.processFetchResponse(shareFetchData, responseData1, sharePartitions, replicaManager); + ShareFetchUtils.processFetchResponse(shareFetch, responseData1, sharePartitions, replicaManager); assertEquals(2, resultData1.size()); assertTrue(resultData1.containsKey(tp0)); diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 46abf04b0a643..fbbccadff8bf4 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -65,7 +65,7 @@ import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchKey; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; -import org.apache.kafka.server.share.fetch.ShareFetchData; +import org.apache.kafka.server.share.fetch.ShareFetch; import org.apache.kafka.server.share.persister.NoOpShareStatePersister; import org.apache.kafka.server.share.persister.Persister; import org.apache.kafka.server.share.session.ShareSession; @@ -223,7 +223,7 @@ public void testNewContextReturnsFinalContextWithRequestData() { ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH); - // shareFetchData is not empty, but the maxBytes of topic partition is 0, which means this is added only for acknowledgements. + // shareFetch is not empty, but the maxBytes of topic partition is 0, which means this is added only for acknowledgements. // New context should be created successfully Map reqData3 = Collections.singletonMap(new TopicIdPartition(tpId1, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(tpId1, 0)); @@ -257,7 +257,7 @@ public void testNewContextReturnsFinalContextError() { ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, ShareRequestMetadata.FINAL_EPOCH); - // shareFetchData is not empty and the maxBytes of topic partition is not 0, which means this is trying to fetch on a Final request. + // shareFetch is not empty and the maxBytes of topic partition is not 0, which means this is trying to fetch on a Final request. // New context should throw an error Map reqData3 = Collections.singletonMap(new TopicIdPartition(tpId1, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(tpId1, PARTITION_MAX_BYTES)); @@ -1665,7 +1665,7 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() { partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), @@ -1700,7 +1700,7 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() { sharePartitions.put(tp2, sp2); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withReplicaManager(mockReplicaManager) .withSharePartitions(sharePartitions) .build(); @@ -1765,7 +1765,7 @@ public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() { partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3); - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), @@ -1801,7 +1801,7 @@ public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() { sharePartitions.put(tp3, sp3); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withReplicaManager(mockReplicaManager) .withSharePartitions(sharePartitions) .build(); @@ -1861,7 +1861,7 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() { partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), @@ -1897,7 +1897,7 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() { sharePartitions.put(tp2, sp2); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withReplicaManager(mockReplicaManager) .withSharePartitions(sharePartitions) .build(); @@ -1965,7 +1965,7 @@ public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() { partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); partitionCacheMap.put(new SharePartitionKey(groupId, tp3), sp3); - ShareFetchData shareFetchData = new ShareFetchData( + ShareFetch shareFetch = new ShareFetch( FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), @@ -2002,7 +2002,7 @@ public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() { sharePartitions.put(tp3, sp3); DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() - .withShareFetchData(shareFetchData) + .withShareFetchData(shareFetch) .withReplicaManager(mockReplicaManager) .withSharePartitions(sharePartitions) .build(); @@ -2063,10 +2063,74 @@ public void testPendingInitializationShouldCompleteFetchRequest() throws Excepti // Verify that replica manager fetch is not called. Mockito.verify(mockReplicaManager, times(0)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); + assertFalse(pendingInitializationFuture.isDone()); // Complete the pending initialization future. pendingInitializationFuture.complete(null); } + @Test + public void testDelayedInitializationShouldCompleteFetchRequest() throws Exception { + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + Uuid fooId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); + Map partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + + // Keep the 2 initialization futures pending and 1 completed with leader not available exception. + CompletableFuture pendingInitializationFuture1 = new CompletableFuture<>(); + CompletableFuture pendingInitializationFuture2 = new CompletableFuture<>(); + when(sp0.maybeInitialize()). + thenReturn(pendingInitializationFuture1) + .thenReturn(pendingInitializationFuture2) + .thenReturn(CompletableFuture.failedFuture(new LeaderNotAvailableException("Leader not available"))); + + DelayedOperationPurgatory shareFetchPurgatorySpy = spy(new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true)); + mockReplicaManagerDelayedShareFetch(mockReplicaManager, shareFetchPurgatorySpy); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).withReplicaManager(mockReplicaManager).withTimer(mockTimer) + .build(); + + // Send 3 requests for share fetch for same share partition. + CompletableFuture> future1 = + sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); + + CompletableFuture> future2 = + sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); + + CompletableFuture> future3 = + sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); + + Mockito.verify(sp0, times(3)).maybeInitialize(); + Mockito.verify(mockReplicaManager, times(3)).addDelayedShareFetchRequest(any(), any()); + Mockito.verify(shareFetchPurgatorySpy, times(3)).tryCompleteElseWatch(any(), any()); + Mockito.verify(shareFetchPurgatorySpy, times(0)).checkAndComplete(any()); + + // All 3 requests should be pending. + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + assertFalse(future3.isDone()); + + // Complete one pending initialization future. + pendingInitializationFuture1.complete(null); + Mockito.verify(mockReplicaManager, times(1)).completeDelayedShareFetchRequest(any()); + Mockito.verify(shareFetchPurgatorySpy, times(1)).checkAndComplete(any()); + + pendingInitializationFuture2.complete(null); + Mockito.verify(mockReplicaManager, times(2)).completeDelayedShareFetchRequest(any()); + Mockito.verify(shareFetchPurgatorySpy, times(2)).checkAndComplete(any()); + + // Verify that replica manager fetch is not called. + Mockito.verify(mockReplicaManager, times(0)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + } + @Test public void testSharePartitionInitializationExceptions() throws Exception { String groupId = "grp"; @@ -2100,6 +2164,7 @@ public void testSharePartitionInitializationExceptions() throws Exception { // between SharePartitionManager and SharePartition to retry the request as SharePartition is not yet ready. assertFalse(future.isCompletedExceptionally()); assertTrue(future.join().isEmpty()); + Mockito.verify(sp0, times(0)).markFenced(); // Verify that the share partition is still in the cache on LeaderNotAvailableException. assertEquals(1, partitionCacheMap.size()); @@ -2111,6 +2176,7 @@ public void testSharePartitionInitializationExceptions() throws Exception { DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Illegal state"); + Mockito.verify(sp0, times(1)).markFenced(); assertTrue(partitionCacheMap.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. @@ -2123,6 +2189,7 @@ public void testSharePartitionInitializationExceptions() throws Exception { DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available"); + Mockito.verify(sp0, times(2)).markFenced(); assertTrue(partitionCacheMap.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. @@ -2135,6 +2202,7 @@ public void testSharePartitionInitializationExceptions() throws Exception { DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.INVALID_REQUEST, "Invalid request"); + Mockito.verify(sp0, times(3)).markFenced(); assertTrue(partitionCacheMap.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. @@ -2147,6 +2215,7 @@ public void testSharePartitionInitializationExceptions() throws Exception { DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced state epoch"); + Mockito.verify(sp0, times(4)).markFenced(); assertTrue(partitionCacheMap.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. @@ -2159,6 +2228,7 @@ public void testSharePartitionInitializationExceptions() throws Exception { DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower"); + Mockito.verify(sp0, times(5)).markFenced(); assertTrue(partitionCacheMap.isEmpty()); // The last exception removes the share partition from the cache hence re-add the share partition to cache. @@ -2171,6 +2241,7 @@ public void testSharePartitionInitializationExceptions() throws Exception { DELAYED_SHARE_FETCH_TIMEOUT_MS, () -> "Processing in delayed share fetch queue never ended."); validateShareFetchFutureException(future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception"); + Mockito.verify(sp0, times(6)).markFenced(); assertTrue(partitionCacheMap.isEmpty()); } @@ -2247,18 +2318,25 @@ public void testSharePartitionInitializationFailure() throws Exception { public void testSharePartitionPartialInitializationFailure() throws Exception { String groupId = "grp"; Uuid memberId1 = Uuid.randomUuid(); + // For tp0, share partition instantiation will fail. TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + // For tp1, share fetch should succeed. TopicIdPartition tp1 = new TopicIdPartition(memberId1, new TopicPartition("foo", 1)); - Map partitionMaxBytes = Map.of(tp0, PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES); - - // Mark partition1 as not the leader. - Partition partition1 = mock(Partition.class); - when(partition1.isLeader()).thenReturn(false); - + // For tp2, share partition initialization will fail. + TopicIdPartition tp2 = new TopicIdPartition(memberId1, new TopicPartition("foo", 2)); + Map partitionMaxBytes = Map.of( + tp0, PARTITION_MAX_BYTES, + tp1, PARTITION_MAX_BYTES, + tp2, PARTITION_MAX_BYTES); + + // Mark partition0 as not the leader. + Partition partition0 = mock(Partition.class); + when(partition0.isLeader()).thenReturn(false); ReplicaManager replicaManager = mock(ReplicaManager.class); when(replicaManager.getPartitionOrException(any())) - .thenReturn(partition1); + .thenReturn(partition0); + // Mock share partition for tp1, so it can succeed. SharePartition sp1 = mock(SharePartition.class); Map partitionCacheMap = new HashMap<>(); partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); @@ -2268,6 +2346,11 @@ public void testSharePartitionPartialInitializationFailure() throws Exception { when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new ShareAcquiredRecords(Collections.emptyList(), 0)); + // Fail initialization for tp2. + SharePartition sp2 = mock(SharePartition.class); + partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + when(sp2.maybeInitialize()).thenReturn(CompletableFuture.failedFuture(new FencedStateEpochException("Fenced state epoch"))); + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); @@ -2289,11 +2372,16 @@ public void testSharePartitionPartialInitializationFailure() throws Exception { assertFalse(future.isCompletedExceptionally()); Map partitionDataMap = future.get(); - // For now only 1 successful partition is included, this will be fixed in subsequents PRs. - assertEquals(1, partitionDataMap.size()); + assertEquals(3, partitionDataMap.size()); + assertTrue(partitionDataMap.containsKey(tp0)); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(), partitionDataMap.get(tp0).errorCode()); assertTrue(partitionDataMap.containsKey(tp1)); assertEquals(Errors.NONE.code(), partitionDataMap.get(tp1).errorCode()); + assertTrue(partitionDataMap.containsKey(tp2)); + assertEquals(Errors.FENCED_STATE_EPOCH.code(), partitionDataMap.get(tp2).errorCode()); + assertEquals("Fenced state epoch", partitionDataMap.get(tp2).errorMessage()); + Mockito.verify(replicaManager, times(0)).completeDelayedShareFetchRequest(any()); Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); } diff --git a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala index 8097021e4cb52..d2679cd62eae9 100644 --- a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala @@ -16,6 +16,7 @@ */ package kafka.server +import kafka.utils.TestUtils import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterInstance, ClusterTest, ClusterTestDefaults, ClusterTestExtensions, ClusterTests, Type} import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords import org.apache.kafka.common.message.{ShareAcknowledgeRequestData, ShareAcknowledgeResponseData, ShareFetchRequestData, ShareFetchResponseData} @@ -253,13 +254,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) - val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) - val shareFetchResponseData = shareFetchResponse.data() - assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) - assertEquals(1, shareFetchResponseData.responses().size()) - assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) - assertEquals(3, shareFetchResponseData.responses().get(0).partitions().size()) + // For the multi partition fetch request, the response may not be available in the first attempt + // as the share partitions might not be initialized yet. So, we retry until we get the response. + var responses = Seq[ShareFetchResponseData.PartitionData]() + TestUtils.waitUntilTrue(() => { + val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) + assertEquals(1, shareFetchResponseData.responses().size()) + val partitionsCount = shareFetchResponseData.responses().get(0).partitions().size() + if (partitionsCount > 0) { + assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) + shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => { + if (!partitionData.acquiredRecords().isEmpty) { + responses = responses :+ partitionData + } + }) + } + responses.size == 3 + }, "Share fetch request failed", 5000) val expectedPartitionData1 = new ShareFetchResponseData.PartitionData() .setPartitionIndex(0) @@ -279,7 +293,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setAcknowledgeErrorCode(Errors.NONE.code()) .setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1))) - shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => { + responses.foreach(partitionData => { partitionData.partitionIndex() match { case 0 => compareFetchResponsePartitions(expectedPartitionData1, partitionData) case 1 => compareFetchResponsePartitions(expectedPartitionData2, partitionData) @@ -2230,13 +2244,26 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMap) - var shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) - var shareFetchResponseData = shareFetchResponse.data() - assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) - assertEquals(1, shareFetchResponseData.responses().size()) - assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) - assertEquals(2, shareFetchResponseData.responses().get(0).partitions().size()) + // For the multi partition fetch request, the response may not be available in the first attempt + // as the share partitions might not be initialized yet. So, we retry until we get the response. + var responses = Seq[ShareFetchResponseData.PartitionData]() + TestUtils.waitUntilTrue(() => { + val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) + assertEquals(1, shareFetchResponseData.responses().size()) + val partitionsCount = shareFetchResponseData.responses().get(0).partitions().size() + if (partitionsCount > 0) { + assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) + shareFetchResponseData.responses().get(0).partitions().foreach(partitionData => { + if (!partitionData.acquiredRecords().isEmpty) { + responses = responses :+ partitionData + } + }) + } + responses.size == 2 + }, "Share fetch request failed", 5000) // Producing 10 more records to the topic partitions created above produceData(topicIdPartition1, 10) @@ -2247,9 +2274,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1) shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, Seq.empty, forget, acknowledgementsMap) - shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) - shareFetchResponseData = shareFetchResponse.data() + val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(1, shareFetchResponseData.responses().size()) assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId()) @@ -2265,10 +2292,25 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo compareFetchResponsePartitions(expectedPartitionData, partitionData) } + // For initial fetch request, the response may not be available in the first attempt when the share + // partition is not initialized yet. Hence, wait for response from all partitions before proceeding. private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition]): Unit = { - val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) - val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty) - connectAndReceive[ShareFetchResponse](shareFetchRequest) + val partitions: util.Set[Integer] = new util.HashSet() + TestUtils.waitUntilTrue(() => { + val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) + val shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, topicIdPartitions, Seq.empty, Map.empty) + val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponseData = shareFetchResponse.data() + + assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) + shareFetchResponseData.responses().foreach(response => { + if (!response.partitions().isEmpty) { + response.partitions().forEach(partitionData => partitions.add(partitionData.partitionIndex)) + } + }) + + partitions.size() == topicIdPartitions.size + }, "Share fetch request failed", 5000) } private def expectedAcquiredRecords(firstOffsets: util.List[Long], lastOffsets: util.List[Long], deliveryCounts: util.List[Int]): util.List[AcquiredRecords] = { diff --git a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java new file mode 100644 index 0000000000000..7c60501ffd67e --- /dev/null +++ b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetch.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.share.fetch; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.server.storage.log.FetchParams; + +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * The ShareFetch class is used to store the fetch parameters for a share fetch request. + */ +public class ShareFetch { + + /** + * The future that will be completed when the fetch is done. + */ + private final CompletableFuture> future; + + /** + * The fetch parameters for the fetch request. + */ + private final FetchParams fetchParams; + /** + * The group id of the share group that is fetching the records. + */ + private final String groupId; + /** + * The member id of the share group that is fetching the records. + */ + private final String memberId; + /** + * The maximum number of bytes that can be fetched for each partition. + */ + private final Map partitionMaxBytes; + /** + * The maximum number of records that can be fetched for the request. + */ + private final int maxFetchRecords; + /** + * The partitions that had an error during the fetch. + */ + private Map erroneous; + + public ShareFetch( + FetchParams fetchParams, + String groupId, + String memberId, + CompletableFuture> future, + Map partitionMaxBytes, + int maxFetchRecords + ) { + this.fetchParams = fetchParams; + this.groupId = groupId; + this.memberId = memberId; + this.future = future; + this.partitionMaxBytes = partitionMaxBytes; + this.maxFetchRecords = maxFetchRecords; + } + + public String groupId() { + return groupId; + } + + public String memberId() { + return memberId; + } + + public Map partitionMaxBytes() { + return partitionMaxBytes; + } + + public FetchParams fetchParams() { + return fetchParams; + } + + public int maxFetchRecords() { + return maxFetchRecords; + } + + /** + * Add an erroneous partition to the share fetch request. If the erroneous map is null, it will + * be created. + *

+ * The method is synchronized to avoid concurrent modification of the erroneous map, as for + * some partitions the pending initialization can be on some threads and for other partitions + * share fetch request can be processed in purgatory. + * + * @param topicIdPartition The partition that had an error. + * @param throwable The error that occurred. + */ + public synchronized void addErroneous(TopicIdPartition topicIdPartition, Throwable throwable) { + if (erroneous == null) { + erroneous = new HashMap<>(); + } + erroneous.put(topicIdPartition, throwable); + } + + /** + * Check if the share fetch request is completed. + * @return true if the request is completed, false otherwise. + */ + public boolean isCompleted() { + return future.isDone(); + } + + /** + * Check if all the partitions in the request have errored. + * @return true if all the partitions in the request have errored, false otherwise. + */ + public synchronized boolean errorInAllPartitions() { + return erroneous != null && erroneous.size() == partitionMaxBytes().size(); + } + + /** + * May be complete the share fetch request with the given partition data. If the request is already completed, + * this method does nothing. If there are any erroneous partitions, they will be added to the response. + * + * @param partitionData The partition data to complete the fetch with. + */ + public void maybeComplete(Map partitionData) { + if (isCompleted()) { + return; + } + + Map response = new HashMap<>(partitionData); + // Add any erroneous partitions to the response. + addErroneousToResponse(response); + future.complete(response); + } + + /** + * Maybe complete the share fetch request with the given exception for the topicIdPartitions. + * If the request is already completed, this method does nothing. If there are any erroneous partitions, + * they will be added to the response. + * + * @param topicIdPartitions The topic id partitions which errored out. + * @param throwable The exception to complete the fetch with. + */ + public void maybeCompleteWithException(Collection topicIdPartitions, Throwable throwable) { + if (isCompleted()) { + return; + } + Map response = topicIdPartitions.stream().collect( + Collectors.toMap(tp -> tp, tp -> new PartitionData() + .setErrorCode(Errors.forException(throwable).code()) + .setErrorMessage(throwable.getMessage()))); + // Add any erroneous partitions to the response. + addErroneousToResponse(response); + future.complete(response); + } + + /** + * Filter out the erroneous partitions from the given set of topicIdPartitions. The order of + * partitions is important hence the method expects an ordered set as input and returns the ordered + * set as well. + * + * @param topicIdPartitions The topic id partitions to filter. + * @return The topic id partitions without the erroneous partitions. + */ + public synchronized Set filterErroneousTopicPartitions(Set topicIdPartitions) { + if (erroneous != null) { + Set retain = new LinkedHashSet<>(topicIdPartitions); + retain.removeAll(erroneous.keySet()); + return retain; + } + return topicIdPartitions; + } + + private synchronized void addErroneousToResponse(Map response) { + if (erroneous != null) { + erroneous.forEach((topicIdPartition, throwable) -> { + response.put(topicIdPartition, new PartitionData() + .setErrorCode(Errors.forException(throwable).code()) + .setErrorMessage(throwable.getMessage())); + }); + } + } +} diff --git a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java b/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java deleted file mode 100644 index c32b32800177f..0000000000000 --- a/share/src/main/java/org/apache/kafka/server/share/fetch/ShareFetchData.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.server.share.fetch; - -import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; -import org.apache.kafka.server.storage.log.FetchParams; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; - -/** - * The ShareFetchData class is used to store the fetch parameters for a share fetch request. - */ -public class ShareFetchData { - - private final FetchParams fetchParams; - private final String groupId; - private final String memberId; - private final CompletableFuture> future; - private final Map partitionMaxBytes; - private final int maxFetchRecords; - - public ShareFetchData( - FetchParams fetchParams, - String groupId, - String memberId, - CompletableFuture> future, - Map partitionMaxBytes, - int maxFetchRecords - ) { - this.fetchParams = fetchParams; - this.groupId = groupId; - this.memberId = memberId; - this.future = future; - this.partitionMaxBytes = partitionMaxBytes; - this.maxFetchRecords = maxFetchRecords; - } - - public String groupId() { - return groupId; - } - - public String memberId() { - return memberId; - } - - public CompletableFuture> future() { - return future; - } - - public Map partitionMaxBytes() { - return partitionMaxBytes; - } - - public FetchParams fetchParams() { - return fetchParams; - } - - public int maxFetchRecords() { - return maxFetchRecords; - } -} diff --git a/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java b/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java new file mode 100644 index 0000000000000..bf5de1ae0de41 --- /dev/null +++ b/share/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share.fetch; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.storage.log.FetchParams; + +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class ShareFetchTest { + + private static final String GROUP_ID = "groupId"; + private static final String MEMBER_ID = "memberId"; + + @Test + public void testErrorInAllPartitions() { + TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(), + Map.of(topicIdPartition, 10), 100); + assertFalse(shareFetch.errorInAllPartitions()); + + shareFetch.addErroneous(topicIdPartition, new RuntimeException()); + assertTrue(shareFetch.errorInAllPartitions()); + } + + @Test + public void testErrorInAllPartitionsWithMultipleTopicIdPartitions() { + TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(), + Map.of(topicIdPartition0, 10, topicIdPartition1, 10), 100); + assertFalse(shareFetch.errorInAllPartitions()); + + shareFetch.addErroneous(topicIdPartition0, new RuntimeException()); + assertFalse(shareFetch.errorInAllPartitions()); + + shareFetch.addErroneous(topicIdPartition1, new RuntimeException()); + assertTrue(shareFetch.errorInAllPartitions()); + } + + @Test + public void testFilterErroneousTopicPartitions() { + TopicIdPartition topicIdPartition0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + ShareFetch shareFetch = new ShareFetch(mock(FetchParams.class), GROUP_ID, MEMBER_ID, new CompletableFuture<>(), + Map.of(topicIdPartition0, 10, topicIdPartition1, 10), 100); + Set result = shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, topicIdPartition1)); + // No erroneous partitions, hence all partitions should be returned. + assertEquals(2, result.size()); + assertTrue(result.contains(topicIdPartition0)); + assertTrue(result.contains(topicIdPartition1)); + + // Add an erroneous partition and verify that it is filtered out. + shareFetch.addErroneous(topicIdPartition0, new RuntimeException()); + result = shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, topicIdPartition1)); + assertEquals(1, result.size()); + assertTrue(result.contains(topicIdPartition1)); + + // Add another erroneous partition and verify that it is filtered out. + shareFetch.addErroneous(topicIdPartition1, new RuntimeException()); + result = shareFetch.filterErroneousTopicPartitions(Set.of(topicIdPartition0, topicIdPartition1)); + assertTrue(result.isEmpty()); + } + +}