Skip to content

Commit

Permalink
KAFKA-18063: SnapshotRegistry should not leak memory (apache#17898)
Browse files Browse the repository at this point in the history
SnapshotRegistry needs to have a reference to all snapshot data structures. However, this should
not be a strong reference, but a weak reference, so that these data structures can be garbage
collected as needed. This PR also adds a scrub mechanism so that we can eventually reclaim the
slots used by GC'ed Revertable objects in the SnapshotRegistry.revertables array.

Reviewers: David Jacot <david.jacot@gmail.com>
  • Loading branch information
cmccabe authored Nov 21, 2024
1 parent 240efbb commit 5fba067
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@

import org.slf4j.Logger;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;


/**
* A registry containing snapshots of timeline data structures.
* We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
* Therefore, we use ArrayLists here rather than a data structure with higher overhead.
* A registry containing snapshots of timeline data structures. All timeline data structures must
* be registered here, so that they can be reverted to the expected state when desired.
* Because the registry only keeps a weak reference to each timeline data structure, it does not
* prevent them from being garbage collected.
*/
public class SnapshotRegistry {
public static final long LATEST_EPOCH = Long.MAX_VALUE;
Expand Down Expand Up @@ -107,12 +108,39 @@ public Snapshot next() {
private final Snapshot head = new Snapshot(Long.MIN_VALUE);

/**
* Collection of all Revertable registered with this registry
* A collection of all Revertable objects registered here. Since we store only weak
* references, every time we access a revertable through this list, we must check to
* see if it has been garbage collected. If so, WeakReference.get will return null.
*
* Although the garbage collector handles freeing the underlying Revertables, over
* time slots in the ArrayList will fill up with expired references. Therefore, after
* enough registrations, we scrub the ArrayList of the expired references by creating
* a new arraylist.
*/
private final List<Revertable> revertables = new ArrayList<>();
private List<WeakReference<Revertable>> revertables = new ArrayList<>();

/**
* The maximum number of registrations to allow before we compact the revertable list.
*/
private final int maxRegistrationsSinceScrub;

/**
* The number of registrations we have done since removing all expired weak references.
*/
private int numRegistrationsSinceScrub = 0;

/**
* The number of scrubs that we have done.
*/
private long numScrubs = 0;

public SnapshotRegistry(LogContext logContext) {
this(logContext, 10_000);
}

public SnapshotRegistry(LogContext logContext, int maxRegistrationsSinceScrub) {
this.log = logContext.logger(SnapshotRegistry.class);
this.maxRegistrationsSinceScrub = maxRegistrationsSinceScrub;
}

/**
Expand Down Expand Up @@ -283,21 +311,60 @@ public long latestEpoch() {
return head.prev().epoch();
}

/**
* Return the number of scrub operations that we have done.
*/
public long numScrubs() {
return numScrubs;
}

/**
* Associate a revertable with this registry.
*/
void register(Revertable revertable) {
revertables.add(revertable);
numRegistrationsSinceScrub++;
if (numRegistrationsSinceScrub > maxRegistrationsSinceScrub) {
scrub();
}
revertables.add(new WeakReference<>(revertable));
}

/**
* Remove all expired weak references from the revertable list.
*/
void scrub() {
ArrayList<WeakReference<Revertable>> newRevertables =
new ArrayList<>(revertables.size() / 2);
for (WeakReference<Revertable> ref : revertables) {
if (ref.get() != null) {
newRevertables.add(ref);
}
}
numScrubs++;
this.revertables = newRevertables;
numRegistrationsSinceScrub = 0;
}

/**
* Delete all snapshots and resets all of the Revertable object registered.
* Delete all snapshots and reset all of the Revertable objects.
*/
public void reset() {
deleteSnapshotsUpTo(LATEST_EPOCH);

for (Revertable revertable : revertables) {
revertable.reset();
ArrayList<WeakReference<Revertable>> newRevertables = new ArrayList<>();
for (WeakReference<Revertable> ref : revertables) {
Revertable revertable = ref.get();
if (revertable != null) {
try {
revertable.reset();
} catch (Exception e) {
log.error("Error reverting {}", revertable, e);
}
newRevertables.add(ref);
}
}
numScrubs++;
this.revertables = newRevertables;
numRegistrationsSinceScrub = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,28 @@ public void testCreateSnapshotOfLatest() {

assertEquals(latest, duplicate);
}

@Test
public void testScrub() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2);
new TimelineInteger(registry).set(123);
new TimelineInteger(registry).set(123);
assertEquals(0, registry.numScrubs());
new TimelineInteger(registry).set(123);
assertEquals(1, registry.numScrubs());
new TimelineInteger(registry).set(123);
new TimelineInteger(registry).set(123);
new TimelineInteger(registry).set(123);
assertEquals(2, registry.numScrubs());
}

@Test
public void testReset() {
SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2);
TimelineInteger integer = new TimelineInteger(registry);
integer.set(123);
registry.reset();
assertEquals(0, integer.get());
assertEquals(1, registry.numScrubs());
}
}

0 comments on commit 5fba067

Please sign in to comment.