Skip to content

Commit

Permalink
nit
Browse files Browse the repository at this point in the history
  • Loading branch information
noob-se7en committed Jan 15, 2025
1 parent 09d557e commit 32b7fd5
Showing 1 changed file with 34 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -603,44 +603,57 @@ public void forceCommit(String tableNameWithType, Set<String> segmentNames, int
Preconditions.checkArgument(TableNameBuilder.isRealtimeTableResource(tableNameWithType), String.format(
"Force commit is only supported for segments of realtime tables - table name: %s segment names: %s",
tableNameWithType, segmentNames));

TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
List<RealtimeSegmentDataManager> segmentsToCommit = getSegmentsToCommit(tableDataManager, segmentNames);

List<List<RealtimeSegmentDataManager>> segmentBatchList =
getSegmentBatchesToCommit(tableDataManager, segmentNames, batchSize);

ExecutorService executorService = Executors.newFixedThreadPool(1);

try {
List<List<RealtimeSegmentDataManager>> segmentBatchList = divideSegmentsInBatches(segmentsToCommit, batchSize);
for (List<RealtimeSegmentDataManager> segmentBatchToCommit : segmentBatchList) {
executorService.submit(() -> executeBatch(tableDataManager, segmentBatchToCommit));
executorService.submit(() -> {
try {
executeBatch(tableDataManager, segmentBatchToCommit);
} finally {
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentBatchToCommit) {
tableDataManager.releaseSegment(realtimeSegmentDataManager);
}
}
});
}
} finally {
executorService.shutdown();
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentsToCommit) {
tableDataManager.releaseSegment(realtimeSegmentDataManager);
}
}
}

private List<RealtimeSegmentDataManager> getSegmentsToCommit(TableDataManager tableDataManager,
Set<String> segmentNames) {
private List<List<RealtimeSegmentDataManager>> getSegmentBatchesToCommit(TableDataManager tableDataManager,
Set<String> segmentNames, int batchSize) {
List<RealtimeSegmentDataManager> segmentsToCommit = new ArrayList<>();

if (tableDataManager == null) {
return segmentsToCommit;
}
try {
if (tableDataManager == null) {
return new ArrayList<>();
}

for (String segmentName : segmentNames) {
SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
if (segmentDataManager != null) {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
segmentsToCommit.add((RealtimeSegmentDataManager) segmentDataManager);
} else {
tableDataManager.releaseSegment(segmentDataManager);
for (String segmentName : segmentNames) {
SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
if (segmentDataManager != null) {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
segmentsToCommit.add((RealtimeSegmentDataManager) segmentDataManager);
} else {
tableDataManager.releaseSegment(segmentDataManager);
}
}
}
}

return segmentsToCommit;
return divideSegmentsInBatches(segmentsToCommit, batchSize);
} catch (Exception e) {
for (RealtimeSegmentDataManager realtimeSegmentDataManager : segmentsToCommit) {
tableDataManager.releaseSegment(realtimeSegmentDataManager);
}
throw new RuntimeException(e);
}
}

private List<List<RealtimeSegmentDataManager>> divideSegmentsInBatches(
Expand Down

0 comments on commit 32b7fd5

Please sign in to comment.