Skip to content

Commit

Permalink
[pulsar] Added getTopics() implementation for Pulsar (#14857)
Browse files Browse the repository at this point in the history
* [pulsar] Added getTopics() implementation for Pulsar

* [pulsar] Added getTopics() implementation for Pulsar

* [pulsar] Fixed checkstyle

* Empty commit to trigger CI
  • Loading branch information
suvodeep-pyne authored Jan 22, 2025
1 parent 2babb6f commit 7cea247
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
public class PulsarConfig {
public static final String STREAM_TYPE = "pulsar";
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
public static final String SERVICE_HTTP_URL = "serviceHttpUrl";
public static final String AUTHENTICATION_TOKEN = "authenticationToken";
public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
public static final String OAUTH_ISSUER_URL = "issuerUrl";
Expand All @@ -53,6 +54,7 @@ public class PulsarConfig {
private final String _subscriberId;
private final String _pulsarTopicName;
private final String _bootstrapServers;
private final String _serviceHttpUrl;
private final SubscriptionInitialPosition _subscriptionInitialPosition;
private final String _authenticationToken;
private final String _tlsTrustCertsFilePath;
Expand All @@ -79,6 +81,7 @@ public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
_bootstrapServers = getConfigValue(streamConfigMap, BOOTSTRAP_SERVERS);
Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the config");

_serviceHttpUrl = getConfigValue(streamConfigMap, SERVICE_HTTP_URL);
_subscriptionInitialPosition = PulsarUtils.offsetCriteriaToSubscription(streamConfig.getOffsetCriteria());
_authenticationToken = getConfigValue(streamConfigMap, AUTHENTICATION_TOKEN);
_tlsTrustCertsFilePath = getConfigValue(streamConfigMap, TLS_TRUST_CERTS_FILE_PATH);
Expand Down Expand Up @@ -149,6 +152,10 @@ public String getBootstrapServers() {
return _bootstrapServers;
}

public String getServiceHttpUrl() {
return _serviceHttpUrl;
}

public SubscriptionInitialPosition getInitialSubscriberPosition() {
return _subscriptionInitialPosition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;

import static com.google.common.base.Preconditions.checkArgument;


/**
* Manages the Pulsar client connection, given the partition id and {@link PulsarConfig}
Expand All @@ -44,35 +51,77 @@ public class PulsarPartitionLevelConnectionHandler implements Closeable {
protected PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig) {
_config = new PulsarConfig(streamConfig, clientId);
_clientId = clientId;
_pulsarClient = createPulsarClient();
}

private PulsarClient createPulsarClient() {
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers());
try {
ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers());
String tlsTrustCertsFilePath = _config.getTlsTrustCertsFilePath();
if (StringUtils.isNotBlank(tlsTrustCertsFilePath)) {
pulsarClientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
String authenticationToken = _config.getAuthenticationToken();
if (StringUtils.isNotBlank(authenticationToken)) {
pulsarClientBuilder.authentication(AuthenticationFactory.token(authenticationToken));
} else {
String issuerUrl = _config.getIssuerUrl();
String credentialsFilePath = _config.getCredentialsFilePath();
String audience = _config.getAudience();
if (StringUtils.isNotBlank(issuerUrl) && StringUtils.isNotBlank(credentialsFilePath) && StringUtils.isNotBlank(
audience)) {
pulsarClientBuilder.authentication(
AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsFilePath),
audience));
}
}
_pulsarClient = pulsarClientBuilder.build();
Optional.ofNullable(_config.getTlsTrustCertsFilePath())
.filter(StringUtils::isNotBlank)
.ifPresent(clientBuilder::tlsTrustCertsFilePath);
Optional.ofNullable(authenticationConfig()).ifPresent(clientBuilder::authentication);
return clientBuilder.build();
} catch (Exception e) {
throw new RuntimeException("Caught exception while creating Pulsar client", e);
}
}

protected PulsarAdmin createPulsarAdmin() {
checkArgument(StringUtils.isNotBlank(_config.getServiceHttpUrl()),
"Service HTTP URL must be provided to perform admin operations");

PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(_config.getServiceHttpUrl());
try {
Optional.ofNullable(_config.getTlsTrustCertsFilePath())
.filter(StringUtils::isNotBlank)
.ifPresent(adminBuilder::tlsTrustCertsFilePath);
Optional.ofNullable(authenticationConfig()).ifPresent(adminBuilder::authentication);
return adminBuilder.build();
} catch (Exception e) {
throw new RuntimeException("Caught exception while creating Pulsar admin", e);
}
}

/**
* Creates and returns an {@link Authentication} object based on the configuration.
*
* @return an Authentication object
*/
private Authentication authenticationConfig()
throws MalformedURLException {
String authenticationToken = _config.getAuthenticationToken();
if (StringUtils.isNotBlank(authenticationToken)) {
return AuthenticationFactory.token(authenticationToken);
} else {
return oAuth2AuthenticationConfig();
}
}

/**
* Creates and returns an OAuth2 {@link Authentication} object.
*
* @return an OAuth2 Authentication object
*/
private Authentication oAuth2AuthenticationConfig()
throws MalformedURLException {
String issuerUrl = _config.getIssuerUrl();
String credentialsFilePath = _config.getCredentialsFilePath();
String audience = _config.getAudience();

if (StringUtils.isNotBlank(issuerUrl) && StringUtils.isNotBlank(credentialsFilePath) && StringUtils.isNotBlank(
audience)) {
return AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), new URL(credentialsFilePath),
audience);
}
return null;
}

@Override
public void close()
throws IOException {
_pulsarClient.close();
if (_pulsarClient != null) {
_pulsarClient.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.TransientConsumerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -177,4 +178,41 @@ public void close()
throws IOException {
super.close();
}

@Override
public List<TopicMetadata> getTopics() {
try (PulsarAdmin pulsarAdmin = createPulsarAdmin()) {
// List to store all topics
List<TopicMetadata> allTopics = new ArrayList<>();

for (String tenant : pulsarAdmin.tenants().getTenants()) {
for (String namespace : pulsarAdmin.namespaces().getNamespaces(tenant)) {
// Fetch all topics for the namespace
List<String> topicNames = pulsarAdmin.topics().getList(namespace);

// Map topics to PulsarTopicMetadata and add to the list
topicNames.stream()
.map(topicName -> new PulsarTopicMetadata().setName(topicName))
.forEach(allTopics::add);
}
}

return allTopics;
} catch (Exception e) {
throw new RuntimeException("Failed to list Pulsar topics across all tenants and namespaces", e);
}
}

public static class PulsarTopicMetadata implements TopicMetadata {
private String _name;

public String getName() {
return _name;
}

public PulsarTopicMetadata setName(String name) {
_name = name;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -153,6 +155,7 @@ public StreamConfig getStreamConfig(String topicName) {
streamConfigMap.put("stream.pulsar.consumer.type", "simple");
streamConfigMap.put("stream.pulsar.topic.name", topicName);
streamConfigMap.put("stream.pulsar.bootstrap.servers", _pulsar.getPulsarBrokerUrl());
streamConfigMap.put("stream.pulsar.serviceHttpUrl", _pulsar.getHttpServiceUrl());
streamConfigMap.put("stream.pulsar.consumer.prop.auto.offset.reset", "smallest");
streamConfigMap.put("stream.pulsar.consumer.factory.class.name", PulsarConsumerFactory.class.getName());
streamConfigMap.put("stream.pulsar.decoder.class.name", "dummy");
Expand Down Expand Up @@ -208,6 +211,18 @@ public void testPartitionLevelConsumerBatchMessages()
}
}

@Test
public void testGetTopics() throws Exception {
try (PulsarStreamMetadataProvider metadataProvider = new PulsarStreamMetadataProvider(CLIENT_ID,
getStreamConfig("NON_EXISTING_TOPIC"))) {
List<StreamMetadataProvider.TopicMetadata> topics = metadataProvider.getTopics();
List<String> topicNames = topics.stream()
.map(StreamMetadataProvider.TopicMetadata::getName)
.collect(Collectors.toList());
assertTrue(topicNames.size() == 4);
}
}

private void testConsumer(PulsarPartitionLevelConsumer consumer, int startIndex, List<MessageId> messageIds) {
MessageId startMessageId = startIndex == 0 ? MessageId.earliest : messageIds.get(startIndex);
int numMessagesFetched = startIndex;
Expand Down

0 comments on commit 7cea247

Please sign in to comment.