Skip to content

Commit

Permalink
fixes bug
Browse files Browse the repository at this point in the history
  • Loading branch information
noob-se7en committed Jan 15, 2025
1 parent 470c6eb commit 8de7bfc
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,7 @@ private void sendForceCommitMessageToServers(String tableNameWithType, Set<Strin
Criteria recipientCriteria = new Criteria();
recipientCriteria.setInstanceName("%");
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setResource(tableNameWithType);/**/
recipientCriteria.setSessionSpecific(true);
ForceCommitMessage message = new ForceCommitMessage(tableNameWithType, consumingSegments, batchSize);
int numMessagesSent = _helixManager.getMessagingService().send(recipientCriteria, message, null, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,17 +605,15 @@ public void forceCommit(String tableNameWithType, Set<String> segmentNames, int

TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
List<RealtimeSegmentDataManager> segmentsToCommit = getSegmentsToCommit(tableDataManager, segmentNames);
ExecutorService executorService = Executors.newFixedThreadPool(1);

try {
List<List<RealtimeSegmentDataManager>> segmentBatchList = divideSegmentsInBatches(segmentsToCommit, batchSize);

CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
for (List<RealtimeSegmentDataManager> segmentBatchToCommit : segmentBatchList) {
future = future.thenRun(() -> executeBatch(tableDataManager, segmentBatchToCommit));
executorService.submit(() -> executeBatch(tableDataManager, segmentBatchToCommit));
}

future.join();
} finally {
executorService.shutdown();
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentsToCommit) {
tableDataManager.releaseSegment(realtimeSegmentDataManager);
}
Expand Down Expand Up @@ -670,6 +668,11 @@ private void executeBatch(TableDataManager tableDataManager, List<RealtimeSegmen
realtimeSegmentDataManager.forceCommit();
}

try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int attemptCount = 0;
try {
attemptCount = DEFAULT_RETRY_POLICY.attempt(() -> isBatchSuccessful(tableDataManager, segmentBatchToCommit));
Expand Down

0 comments on commit 8de7bfc

Please sign in to comment.