Skip to content

Commit

Permalink
KAFKA-17956 Remove Admin.listShareGroups (apache#17912)
Browse files Browse the repository at this point in the history
KIP-1043 introduced Admin.listGroups as the way to list all types of groups. As a result, Admin.listShareGroups has been removed. This PR is the final step of the removal.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
AndrewJSchofield authored Nov 25, 2024
1 parent 54843e6 commit d17a149
Show file tree
Hide file tree
Showing 11 changed files with 20 additions and 439 deletions.
20 changes: 0 additions & 20 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1824,26 +1824,6 @@ default DescribeShareGroupsResult describeShareGroups(Collection<String> groupId
return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
}

/**
* List the share groups available in the cluster.
*
* @param options The options to use when listing the share groups.
* @return The ListShareGroupsResult.
*/
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);

/**
* List the share groups available in the cluster with the default options.
* <p>
* This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
* See the overload for more details.
*
* @return The ListShareGroupsResult.
*/
default ListShareGroupsResult listShareGroups() {
return listShareGroups(new ListShareGroupsOptions());
}

/**
* Describe some classic groups in the cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,6 @@ public DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds
return delegate.describeShareGroups(groupIds, options);
}

@Override
public ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) {
return delegate.listShareGroups(options);
}

@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3855,48 +3855,6 @@ public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(
return new DeleteConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions);
}

private static final class ListShareGroupsResults {
private final List<Throwable> errors;
private final HashMap<String, ShareGroupListing> listings;
private final HashSet<Node> remaining;
private final KafkaFutureImpl<Collection<Object>> future;

ListShareGroupsResults(Collection<Node> leaders,
KafkaFutureImpl<Collection<Object>> future) {
this.errors = new ArrayList<>();
this.listings = new HashMap<>();
this.remaining = new HashSet<>(leaders);
this.future = future;
tryComplete();
}

synchronized void addError(Throwable throwable, Node node) {
ApiError error = ApiError.fromThrowable(throwable);
if (error.message() == null || error.message().isEmpty()) {
errors.add(error.error().exception("Error listing groups on " + node));
} else {
errors.add(error.error().exception("Error listing groups on " + node + ": " + error.message()));
}
}

synchronized void addListing(ShareGroupListing listing) {
listings.put(listing.groupId(), listing);
}

synchronized void tryComplete(Node leader) {
remaining.remove(leader);
tryComplete();
}

private synchronized void tryComplete() {
if (remaining.isEmpty()) {
ArrayList<Object> results = new ArrayList<>(listings.values());
results.addAll(errors);
future.complete(results);
}
}
}

@Override
public DescribeShareGroupsResult describeShareGroups(final Collection<String> groupIds,
final DescribeShareGroupsOptions options) {
Expand All @@ -3908,93 +3866,6 @@ public DescribeShareGroupsResult describeShareGroups(final Collection<String> gr
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

@Override
public ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) {
final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<>();
final long nowMetadata = time.milliseconds();
final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) {
@Override
MetadataRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.emptyList())
.setAllowAutoTopicCreation(true));
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
Collection<Node> nodes = metadataResponse.brokers();
if (nodes.isEmpty())
throw new StaleMetadataException("Metadata fetch failed due to missing broker list");

HashSet<Node> allNodes = new HashSet<>(nodes);
final ListShareGroupsResults results = new ListShareGroupsResults(allNodes, all);

for (final Node node : allNodes) {
final long nowList = time.milliseconds();
runnable.call(new Call("listShareGroups", deadline, new ConstantNodeIdProvider(node.id())) {
@Override
ListGroupsRequest.Builder createRequest(int timeoutMs) {
List<String> states = options.states()
.stream()
.map(GroupState::toString)
.collect(Collectors.toList());
List<String> types = Collections.singletonList(GroupType.SHARE.toString());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setStatesFilter(states)
.setTypesFilter(types)
);
}

private void maybeAddShareGroup(ListGroupsResponseData.ListedGroup group) {
final String groupId = group.groupId();
final Optional<GroupState> state = group.groupState().isEmpty()
? Optional.empty()
: Optional.of(GroupState.parse(group.groupState()));
final ShareGroupListing groupListing = new ShareGroupListing(groupId, state);
results.addListing(groupListing);
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
synchronized (results) {
Errors error = Errors.forCode(response.data().errorCode());
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
throw error.exception();
} else if (error != Errors.NONE) {
results.addError(error.exception(), node);
} else {
for (ListGroupsResponseData.ListedGroup group : response.data().groups()) {
maybeAddShareGroup(group);
}
}
results.tryComplete(node);
}
}

@Override
void handleFailure(Throwable throwable) {
synchronized (results) {
results.addError(throwable, node);
results.tryComplete(node);
}
}
}, nowList);
}
}

@Override
void handleFailure(Throwable throwable) {
KafkaException exception = new KafkaException("Failed to find brokers to send ListGroups", throwable);
all.complete(Collections.singletonList(exception));
}
}, nowMetadata);

return new ListShareGroupsResult(all);
}

@Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds,
final DescribeClassicGroupsOptions options) {
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit d17a149

Please sign in to comment.