Skip to content

Commit

Permalink
MINOR: Cleanups in raft module (apache#17877)
Browse files Browse the repository at this point in the history
Reviewers: Yash Mayya <yash.mayya@gmail.com>
  • Loading branch information
mimaison authored Nov 21, 2024
1 parent 4f16887 commit c0a092f
Show file tree
Hide file tree
Showing 21 changed files with 67 additions and 68 deletions.
8 changes: 4 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/ElectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ public boolean isLeader(int nodeId) {
public boolean isVotedCandidate(ReplicaKey nodeKey) {
if (nodeKey.id() < 0) {
throw new IllegalArgumentException("Invalid node key " + nodeKey);
} else if (!votedKey.isPresent()) {
} else if (votedKey.isEmpty()) {
return false;
} else if (votedKey.get().id() != nodeKey.id()) {
return false;
} else if (!votedKey.get().directoryId().isPresent()) {
} else if (votedKey.get().directoryId().isEmpty()) {
// when the persisted voted directory id is not present assume that we voted for this candidate;
// this happens when the kraft version is 0.
return true;
Expand All @@ -87,7 +87,7 @@ public boolean isVotedCandidate(ReplicaKey nodeKey) {
}

public int leaderId() {
if (!leaderId.isPresent())
if (leaderId.isEmpty())
throw new IllegalStateException("Attempt to access nil leaderId");
return leaderId.getAsInt();
}
Expand All @@ -101,7 +101,7 @@ public OptionalInt optionalLeaderId() {
}

public ReplicaKey votedKey() {
if (!votedKey.isPresent()) {
if (votedKey.isEmpty()) {
throw new IllegalStateException("Attempt to access nil votedId");
}

Expand Down
4 changes: 2 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/FollowerState.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void overrideFetchTimeout(long currentTimeMs, long timeoutMs) {
private long updateVoterPeriodMs() {
// Allow for a few rounds of fetch request before attempting to update
// the voter state
return fetchTimeoutMs * 3;
return fetchTimeoutMs * 3L;
}

public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) {
Expand All @@ -150,7 +150,7 @@ public void resetUpdateVoterPeriod(long currentTimeMs) {
}

public boolean updateHighWatermark(OptionalLong newHighWatermark) {
if (!newHighWatermark.isPresent() && highWatermark.isPresent()) {
if (newHighWatermark.isEmpty() && highWatermark.isPresent()) {
throw new IllegalArgumentException(
String.format("Attempt to overwrite current high watermark %s with unknown value", highWatermark)
);
Expand Down
28 changes: 14 additions & 14 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ private long endEpochElectionBackoff(Collection<ReplicaKey> preferredCandidates)
int position = 0;
for (ReplicaKey candidate : preferredCandidates) {
if (candidate.id() == quorum.localIdOrThrow()) {
if (!candidate.directoryId().isPresent() ||
if (candidate.directoryId().isEmpty() ||
candidate.directoryId().get().equals(quorum.localDirectoryId())
) {
// Found ourselves in the preferred candidate list
Expand Down Expand Up @@ -1788,7 +1788,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(

Optional<FetchSnapshotRequestData.PartitionSnapshot> partitionSnapshotOpt = FetchSnapshotRequest
.forTopicPartition(data, log.topicPartition());
if (!partitionSnapshotOpt.isPresent()) {
if (partitionSnapshotOpt.isEmpty()) {
// The Raft client assumes that there is only one topic partition.
TopicPartition unknownTopicPartition = new TopicPartition(
data.topics().get(0).name(),
Expand Down Expand Up @@ -1828,7 +1828,7 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest(
);

Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
if (!snapshotOpt.isPresent() || snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID)) {
if (snapshotOpt.isEmpty() || snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID)) {
// The bootstrap checkpoint should not be replicated. The first leader will
// make sure that the content of the bootstrap checkpoint is included in the
// partition log
Expand Down Expand Up @@ -1944,7 +1944,7 @@ private boolean handleFetchSnapshotResponse(

Optional<FetchSnapshotResponseData.PartitionSnapshot> partitionSnapshotOpt = FetchSnapshotResponse
.forTopicPartition(data, log.topicPartition());
if (!partitionSnapshotOpt.isPresent()) {
if (partitionSnapshotOpt.isEmpty()) {
return false;
}

Expand Down Expand Up @@ -2098,7 +2098,7 @@ private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
}

Optional<ReplicaKey> newVoter = RaftUtil.addVoterRequestVoterKey(data);
if (!newVoter.isPresent() || !newVoter.get().directoryId().isPresent()) {
if (newVoter.isEmpty() || newVoter.get().directoryId().isEmpty()) {
return completedFuture(
new AddRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
Expand All @@ -2107,7 +2107,7 @@ private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
}

Endpoints newVoterEndpoints = Endpoints.fromAddVoterRequest(data.listeners());
if (!newVoterEndpoints.address(channel.listenerName()).isPresent()) {
if (newVoterEndpoints.address(channel.listenerName()).isEmpty()) {
return completedFuture(
new AddRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
Expand Down Expand Up @@ -2181,7 +2181,7 @@ private CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(
}

Optional<ReplicaKey> oldVoter = RaftUtil.removeVoterRequestVoterKey(data);
if (!oldVoter.isPresent() || !oldVoter.get().directoryId().isPresent()) {
if (oldVoter.isEmpty() || oldVoter.get().directoryId().isEmpty()) {
return completedFuture(
new RemoveRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
Expand Down Expand Up @@ -2226,7 +2226,7 @@ private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
}

Optional<ReplicaKey> voter = RaftUtil.updateVoterRequestVoterKey(data);
if (!voter.isPresent() || !voter.get().directoryId().isPresent()) {
if (voter.isEmpty() || voter.get().directoryId().isEmpty()) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
Expand All @@ -2238,7 +2238,7 @@ private CompletableFuture<UpdateRaftVoterResponseData> handleUpdateVoterRequest(
}

Endpoints voterEndpoints = Endpoints.fromUpdateVoterRequest(data.listeners());
if (!voterEndpoints.address(channel.listenerName()).isPresent()) {
if (voterEndpoints.address(channel.listenerName()).isEmpty()) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
Expand Down Expand Up @@ -2319,8 +2319,8 @@ private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
return quorum.isLeader();
} else {
return epoch != quorum.epoch()
|| !leaderId.isPresent()
|| !quorum.leaderId().isPresent()
|| leaderId.isEmpty()
|| quorum.leaderId().isEmpty()
|| leaderId.equals(quorum.leaderId());
}
}
Expand Down Expand Up @@ -2516,7 +2516,7 @@ private boolean isValidVoterKey(Optional<ReplicaKey> voterKey) {
return voterKey
.map(key -> {
if (!OptionalInt.of(key.id()).equals(nodeId)) return false;
if (!key.directoryId().isPresent()) return true;
if (key.directoryId().isEmpty()) return true;

return key.directoryId().get().equals(nodeDirectoryId);
})
Expand Down Expand Up @@ -3399,7 +3399,7 @@ public void resign(int epoch) {
// Note that if we transition to another state before we have a chance to
// request resignation, then we consider the call fulfilled.
Optional<LeaderState<Object>> leaderStateOpt = quorum.maybeLeaderState();
if (!leaderStateOpt.isPresent()) {
if (leaderStateOpt.isEmpty()) {
logger.debug("Ignoring call to resign from epoch {} since this node is " +
"no longer the leader", epoch);
return;
Expand Down Expand Up @@ -3702,7 +3702,7 @@ private boolean shouldFireLeaderChange(LeaderAndEpoch leaderAndEpoch) {
return true;
} else {
return leaderAndEpoch.leaderId().isPresent() &&
!lastFiredLeaderChange.leaderId().isPresent();
lastFiredLeaderChange.leaderId().isEmpty();
}
}

Expand Down
7 changes: 3 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -112,7 +111,7 @@ protected LeaderState(
new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, voterNode.listeners())
);
}
this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters));
this.grantingVoters = Set.copyOf(grantingVoters);
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null");
// use the 1.5x of fetch timeout to tolerate some network transition time or other IO time.
Expand Down Expand Up @@ -809,9 +808,9 @@ void updateFollowerState(
public int compareTo(ReplicaState that) {
if (this.endOffset.equals(that.endOffset))
return this.replicaKey.compareTo(that.replicaKey);
else if (!this.endOffset.isPresent())
else if (this.endOffset.isEmpty())
return 1;
else if (!that.endOffset.isPresent())
else if (that.endOffset.isEmpty())
return -1;
else
return Long.compare(that.endOffset.get().offset(), this.endOffset.get().offset());
Expand Down
8 changes: 4 additions & 4 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
ElectionState election = readElectionState();

final EpochState initialState;
if (election.hasVoted() && !localId.isPresent()) {
if (election.hasVoted() && localId.isEmpty()) {
throw new IllegalStateException(
String.format(
"Initialized quorum state (%s) with a voted candidate but without a local id",
Expand Down Expand Up @@ -332,7 +332,7 @@ public Endpoints leaderEndpoints() {
}

public boolean isVoter() {
if (!localId.isPresent()) {
if (localId.isEmpty()) {
return false;
}

Expand Down Expand Up @@ -425,7 +425,7 @@ public void transitionToUnattachedVotedState(
epoch
)
);
} else if (!localId.isPresent()) {
} else if (localId.isEmpty()) {
throw new IllegalStateException("Cannot transition to voted without a replica id");
} else if (epoch < currentEpoch) {
throw new IllegalStateException(
Expand Down Expand Up @@ -707,7 +707,7 @@ public boolean isUnattached() {
}

public boolean isUnattachedNotVoted() {
return maybeUnattachedState().filter(unattached -> !unattached.votedKey().isPresent()).isPresent();
return maybeUnattachedState().filter(unattached -> unattached.votedKey().isEmpty()).isPresent();
}

public boolean isUnattachedAndVoted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) {
if (votedKey.isPresent()) {
ReplicaKey votedReplicaKey = votedKey.get();
if (votedReplicaKey.id() == candidateKey.id()) {
return !votedReplicaKey.directoryId().isPresent() || votedReplicaKey.directoryId().equals(candidateKey.directoryId());
return votedReplicaKey.directoryId().isEmpty() || votedReplicaKey.directoryId().equals(candidateKey.directoryId());
}
log.debug(
"Rejecting vote request from candidate ({}), already have voted for another " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(

// Check that the leader has established a HWM and committed the current epoch
Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset);
if (!highWatermark.isPresent()) {
if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand All @@ -127,7 +127,7 @@ public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(

// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
if (!votersEntry.isPresent() || votersEntry.get().offset() >= highWatermark.get()) {
if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand Down Expand Up @@ -172,7 +172,7 @@ public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
this::buildApiVersionsRequest,
currentTimeMs
);
if (!timeout.isPresent()) {
if (timeout.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand Down Expand Up @@ -203,7 +203,7 @@ public boolean handleApiVersionsResponse(
long currentTimeMs
) {
Optional<AddVoterHandlerState> handlerState = leaderState.addVoterHandlerState();
if (!handlerState.isPresent()) {
if (handlerState.isEmpty()) {
// There are no pending add operation just ignore the api response
return true;
}
Expand Down Expand Up @@ -242,7 +242,7 @@ public boolean handleApiVersionsResponse(
return false;
}

// Check that the new voter supports the kraft.verion for reconfiguration
// Check that the new voter supports the kraft.version for reconfiguration
KRaftVersion kraftVersion = partitionState.lastKraftVersion();
if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public long timeUntilOperationExpiration(long currentTimeMs) {
}

public boolean expectingApiResponse(int replicaId) {
return !lastOffset.isPresent() && replicaId == voterKey.id();
return lastOffset.isEmpty() && replicaId == voterKey.id();
}

public void setLastOffset(long lastOffset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private RecordsBatchReader(
public boolean hasNext() {
ensureOpen();

if (!nextBatch.isPresent()) {
if (nextBatch.isEmpty()) {
nextBatch = nextBatch();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public RecordsIterator(
public boolean hasNext() {
ensureOpen();

if (!nextBatch.isPresent()) {
if (nextBatch.isEmpty()) {
nextBatch = nextBatch();
}

Expand Down Expand Up @@ -334,7 +334,7 @@ private T decodeDataRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value)
throw new IllegalArgumentException("Got key in the record when no key was expected");
}

if (!value.isPresent()) {
if (value.isEmpty()) {
throw new IllegalArgumentException("Missing value in the record when a value was expected");
} else if (value.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty value in the record");
Expand All @@ -346,13 +346,13 @@ private T decodeDataRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value)
}

private static ControlRecord decodeControlRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value) {
if (!key.isPresent()) {
if (key.isEmpty()) {
throw new IllegalArgumentException("Missing key in the record when a key was expected");
} else if (key.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty key in the record");
}

if (!value.isPresent()) {
if (value.isEmpty()) {
throw new IllegalArgumentException("Missing value in the record when a value was expected");
} else if (value.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty value in the record");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(

// Check that the leader has established a HWM and committed the current epoch
Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset);
if (!highWatermark.isPresent()) {
if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand All @@ -117,7 +117,7 @@ public CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(

// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry();
if (!votersEntry.isPresent() || votersEntry.get().offset() >= highWatermark.get()) {
if (votersEntry.isEmpty() || votersEntry.get().offset() >= highWatermark.get()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.REQUEST_TIMED_OUT,
Expand All @@ -132,7 +132,7 @@ public CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(

// Remove the voter from the set of voters
Optional<VoterSet> newVoters = votersEntry.get().value().removeVoter(voterKey);
if (!newVoters.isPresent()) {
if (newVoters.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.VOTER_NOT_FOUND,
Expand Down
Loading

0 comments on commit c0a092f

Please sign in to comment.