Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince committed Sep 17, 2024
1 parent c24d8e8 commit d2eda3b
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
// complete upsert data view, e.g. the newly created consuming segment or newly uploaded immutable segments. Such
// segments can be processed by the server even before they get included in the broker's routing table. Server can
// remove a segment from this map if it knows the segment has been included in the broker's routing table. But
// there is no easy or efficient way for server to know if this happens on all brokers, so when removing a segment
// from this map, we allow to delay the removal for a configurable period, as best effort to wait for brokers to
// add the segment in routing tables. The delay timer starts after the segment is fully processed, as the segment
// processing time can vary greatly.
// there is no easy or efficient way for server to know if this happens on all brokers, so we track the segments
// for a configurable period, to wait for brokers to add the segment in routing tables.
private final Map<String, Long> _newlyAddedSegments = new ConcurrentHashMap<>();
private final long _newSegmentTrackingTimeMs;

Expand Down Expand Up @@ -433,7 +431,6 @@ public void addSegment(ImmutableSegment segment) {
if (_enableSnapshot) {
_snapshotLock.readLock().unlock();
}
untrackNewlyAddedSegment(segment);
finishOperation();
}
}
Expand Down Expand Up @@ -1230,20 +1227,10 @@ public void untrackSegmentForUpsertView(IndexSegment segment) {
if (_upsertViewManager != null) {
_upsertViewManager.untrackSegment(segment);
}
if (segment instanceof MutableSegment) {
untrackNewlyAddedSegment(segment);
}
}

@VisibleForTesting
void trackNewlyAddedSegment(IndexSegment segment) {
if (_newSegmentTrackingTimeMs > 0) {
_newlyAddedSegments.put(segment.getSegmentName(), -1L);
}
}

@VisibleForTesting
void untrackNewlyAddedSegment(IndexSegment segment) {
if (_newSegmentTrackingTimeMs > 0) {
_newlyAddedSegments.put(segment.getSegmentName(), System.currentTimeMillis() + _newSegmentTrackingTimeMs);
}
Expand All @@ -1254,9 +1241,9 @@ public Set<String> getNewlyAddedSegments() {
// Untrack stale segments at query time. The overhead should be limited as the tracking map should be very small.
long nowMs = System.currentTimeMillis();
if (_logger.isDebugEnabled()) {
_logger.debug("Cleaning stale segments from tracking map: {} with nowMs: {}", _newlyAddedSegments, nowMs);
_logger.debug("Removing stale segments from tracking map: {} with nowMs: {}", _newlyAddedSegments, nowMs);
}
_newlyAddedSegments.values().removeIf(v -> v > 0 && v < nowMs);
_newlyAddedSegments.values().removeIf(v -> v < nowMs);
return _newlyAddedSegments.keySet();
}
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* thread can specify a freshness threshold query option to refresh the bitmap copies if not fresh enough.
*/
public class UpsertViewManager {
public static final long DEFAULT_NEW_SEGMENT_TRACKING_TIME_MS = 30000L;
public static final long DEFAULT_NEW_SEGMENT_TRACKING_TIME_MS = 10000;
private static final Logger LOGGER = LoggerFactory.getLogger(UpsertViewManager.class);
private final UpsertConfig.ConsistencyMode _consistencyMode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,7 @@ public void testConsistencyModeSnapshotWithUntrackedSegments()
}

@Test
public void testTrackUntrackNewlyAddedSegments()
throws InterruptedException {
public void testTrackNewlyAddedSegments() {
UpsertContext upsertContext = mock(UpsertContext.class);
when(upsertContext.getNewSegmentTrackingTimeMs()).thenReturn(0L);
DummyPartitionUpsertMetadataManager upsertMetadataManager =
Expand All @@ -653,16 +652,11 @@ public void testTrackUntrackNewlyAddedSegments()
upsertMetadataManager.trackNewlyAddedSegment(seg1);
upsertMetadataManager.trackNewlyAddedSegment(seg2);
assertEquals(upsertMetadataManager.getNewlyAddedSegments().size(), 2);
// Segments are kept if not untracked explicitly, as delay timer starts after untracking.
Thread.sleep(300);
assertEquals(upsertMetadataManager.getNewlyAddedSegments().size(), 2);
upsertMetadataManager.untrackNewlyAddedSegment(seg1);
upsertMetadataManager.untrackNewlyAddedSegment(seg2);
// There is 100ms delay before removal of stale segments.
assertEquals(upsertMetadataManager.getNewlyAddedSegments().size(), 2);
DummyPartitionUpsertMetadataManager finalUpsertMetadataManager = upsertMetadataManager;
TestUtils.waitForCondition(aVoid -> finalUpsertMetadataManager.getNewlyAddedSegments().isEmpty(), 300L,
"Failed to untrack segments");
"Failed to remove stale segments");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ public enum ConsistencyMode {
@JsonPropertyDescription("Refresh interval when using the snapshot consistency mode")
private long _upsertViewRefreshIntervalMs = 3000;

// As the time to process a segment can very long and vary greatly, so delay timer starts after the segment is
// fully processed on the server.
@JsonPropertyDescription("How long to track a newly added segment, after it is processed by the server.")
private long _newSegmentTrackingTimeMs = 30000;
// Setting this time to 0 to disable the tracking feature.
@JsonPropertyDescription("Track newly added segments on the server for a more complete upsert data view.")
private long _newSegmentTrackingTimeMs = 10000;

@JsonPropertyDescription("Custom class for upsert metadata manager")
private String _metadataManagerClass;
Expand Down

0 comments on commit d2eda3b

Please sign in to comment.