Skip to content

Commit

Permalink
updates log
Browse files Browse the repository at this point in the history
  • Loading branch information
noob-se7en committed Jan 16, 2025
1 parent 55fa6e2 commit 3297ddd
Showing 1 changed file with 14 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1759,34 +1759,28 @@ public Set<String> forceCommit(String tableNameWithType, @Nullable String partit
private void executeBatch(String tableNameWithType, Set<String> segmentBatchToCommit) {
sendForceCommitMessageToServers(tableNameWithType, segmentBatchToCommit);

try {
Thread.sleep(FORCE_COMMIT_STATUS_CHECK_INTERVAL_MS);
} catch (InterruptedException ignored) {
}

int attemptCount = 0;
final Set<String>[] segmentsYetToBeCommitted = new Set[]{new HashSet<>()};
try {
attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> isBatchSuccessful(tableNameWithType, segmentBatchToCommit));
attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> {
segmentsYetToBeCommitted[0] = getSegmentsYetToBeCommitted(tableNameWithType, segmentBatchToCommit);
return segmentsYetToBeCommitted[0].isEmpty();
});
} catch (AttemptsExceededException | RetriableOperationException e) {
String errorMsg =
String.format("Failed to execute the forceCommit batch of segments: %s , attempt count: %d",
segmentBatchToCommit,
attemptCount);
String errorMsg = String.format(
"Exception occurred while executing the forceCommit batch of segments: %s, attempt count: %d, "
+ "segmentsYetToBeCommitted: %s",
segmentBatchToCommit, attemptCount, segmentsYetToBeCommitted[0]);
LOGGER.error(errorMsg, e);
throw new RuntimeException(e);
}
}

private boolean isBatchSuccessful(String tableNameWithType,
Set<String> segmentBatchToCommit) {

Set<String> onlineSegmentsForTable =
_helixResourceManager.getOnlineSegmentsFromIdealState(tableNameWithType, false);

for (String segmentName : segmentBatchToCommit) {
if (!onlineSegmentsForTable.contains(segmentName)) {
return false;
}
}

return true;
}

private List<Set<String>> getSegmentBatchList(IdealState idealState, Set<String> targetConsumingSegments,
int batchSize) {
Map<String, Queue<String>> instanceToConsumingSegments =
Expand Down

0 comments on commit 3297ddd

Please sign in to comment.