Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince committed Sep 17, 2024
1 parent c24d8e8 commit aa04b64
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,6 @@ public void addSegment(ImmutableSegment segment) {
if (_enableSnapshot) {
_snapshotLock.readLock().unlock();
}
untrackNewlyAddedSegment(segment);
finishOperation();
}
}
Expand Down Expand Up @@ -1230,20 +1229,10 @@ public void untrackSegmentForUpsertView(IndexSegment segment) {
if (_upsertViewManager != null) {
_upsertViewManager.untrackSegment(segment);
}
if (segment instanceof MutableSegment) {
untrackNewlyAddedSegment(segment);
}
}

@VisibleForTesting
void trackNewlyAddedSegment(IndexSegment segment) {
if (_newSegmentTrackingTimeMs > 0) {
_newlyAddedSegments.put(segment.getSegmentName(), -1L);
}
}

@VisibleForTesting
void untrackNewlyAddedSegment(IndexSegment segment) {
if (_newSegmentTrackingTimeMs > 0) {
_newlyAddedSegments.put(segment.getSegmentName(), System.currentTimeMillis() + _newSegmentTrackingTimeMs);
}
Expand All @@ -1254,9 +1243,9 @@ public Set<String> getNewlyAddedSegments() {
// Untrack stale segments at query time. The overhead should be limited as the tracking map should be very small.
long nowMs = System.currentTimeMillis();
if (_logger.isDebugEnabled()) {
_logger.debug("Cleaning stale segments from tracking map: {} with nowMs: {}", _newlyAddedSegments, nowMs);
_logger.debug("Removing stale segments from tracking map: {} with nowMs: {}", _newlyAddedSegments, nowMs);
}
_newlyAddedSegments.values().removeIf(v -> v > 0 && v < nowMs);
_newlyAddedSegments.values().removeIf(v -> v < nowMs);
return _newlyAddedSegments.keySet();
}
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,7 @@ public void testConsistencyModeSnapshotWithUntrackedSegments()
}

@Test
public void testTrackUntrackNewlyAddedSegments()
throws InterruptedException {
public void testTrackNewlyAddedSegments() {
UpsertContext upsertContext = mock(UpsertContext.class);
when(upsertContext.getNewSegmentTrackingTimeMs()).thenReturn(0L);
DummyPartitionUpsertMetadataManager upsertMetadataManager =
Expand All @@ -653,16 +652,11 @@ public void testTrackUntrackNewlyAddedSegments()
upsertMetadataManager.trackNewlyAddedSegment(seg1);
upsertMetadataManager.trackNewlyAddedSegment(seg2);
assertEquals(upsertMetadataManager.getNewlyAddedSegments().size(), 2);
// Segments are kept if not untracked explicitly, as delay timer starts after untracking.
Thread.sleep(300);
assertEquals(upsertMetadataManager.getNewlyAddedSegments().size(), 2);
upsertMetadataManager.untrackNewlyAddedSegment(seg1);
upsertMetadataManager.untrackNewlyAddedSegment(seg2);
// There is 100ms delay before removal of stale segments.
assertEquals(upsertMetadataManager.getNewlyAddedSegments().size(), 2);
DummyPartitionUpsertMetadataManager finalUpsertMetadataManager = upsertMetadataManager;
TestUtils.waitForCondition(aVoid -> finalUpsertMetadataManager.getNewlyAddedSegments().isEmpty(), 300L,
"Failed to untrack segments");
"Failed to remove stale segments");
}

@Test
Expand Down

0 comments on commit aa04b64

Please sign in to comment.