Skip to content

Commit

Permalink
SolaceIO: separate auth and session settings (apache#32406)
Browse files Browse the repository at this point in the history
* Refactored to separate authentication and session settings, and allow inheritance and overriding of SessionService

* Improve methods' javadoc
  • Loading branch information
bzablocki authored Jan 7, 2025
1 parent 1d5a3e8 commit f17bb8d
Show file tree
Hide file tree
Showing 10 changed files with 345 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@
*
* <p>See {@link Read#withSessionServiceFactory(SessionServiceFactory)} for session authentication.
* The connector provides implementation of the {@link SessionServiceFactory} using the Basic
* Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService}.
* Authentication: {@link org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory}.
*
* <p>For the authentication to the SEMP API ({@link Read#withSempClientFactory(SempClientFactory)})
* the connector provides {@link org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory} to
Expand Down Expand Up @@ -639,9 +639,8 @@ public Read<T> withSempClientFactory(SempClientFactory sempClientFactory) {
* <li>create a {@link org.apache.beam.sdk.io.solace.broker.MessageReceiver}.
* </ul>
*
* <p>An existing implementation of the SempClientFactory includes {@link
* org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionService} which implements the Basic
* Authentication to Solace. *
* <p>The {@link BasicAuthJcsmpSessionServiceFactory} is an existing implementation of the
* {@link SessionServiceFactory} which implements the Basic Authentication to Solace.
*
* <p>To use it, specify the credentials with the builder methods. *
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME;

import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.JCSMPProperties;

/**
* A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link
* A factory for creating {@link JcsmpSessionService} instances. Extends {@link
* SessionServiceFactory}.
*
* <p>This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with
* authenticate to Solace with Basic Authentication.
* <p>This factory provides a way to create {@link JcsmpSessionService} that use Basic
* Authentication.
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
Expand Down Expand Up @@ -69,15 +70,14 @@ public abstract static class Builder {

@Override
public SessionService create() {
BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder();
if (queue != null) {
builder = builder.queueName(queue.getName());
}
return builder
.host(host())
.username(username())
.password(password())
.vpnName(vpnName())
.build();
JCSMPProperties jcsmpProperties = new JCSMPProperties();
jcsmpProperties.setProperty(JCSMPProperties.VPN_NAME, vpnName());
jcsmpProperties.setProperty(
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, username());
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, password());
jcsmpProperties.setProperty(JCSMPProperties.HOST, host());

return JcsmpSessionService.create(jcsmpProperties, getQueue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,45 +43,16 @@
/**
* A class that manages a connection to a Solace broker using basic authentication.
*
* <p>This class provides a way to connect to a Solace broker and receive messages from a queue. The
* connection is established using basic authentication.
* <p>This class provides a way to connect to a Solace broker and receive messages from a queue.
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionService extends SessionService {
public abstract class JcsmpSessionService extends SessionService {

/** The name of the queue to receive messages from. */
public abstract @Nullable String queueName();
/** JCSMP properties used to establish the connection. */
abstract JCSMPProperties jcsmpProperties();

/** The host name or IP address of the Solace broker. Format: Host[:Port] */
public abstract String host();

/** The username to use for authentication. */
public abstract String username();

/** The password to use for authentication. */
public abstract String password();

/** The name of the VPN to connect to. */
public abstract String vpnName();

public static Builder builder() {
return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME);
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder queueName(@Nullable String queueName);

public abstract Builder host(String host);

public abstract Builder username(String username);

public abstract Builder password(String password);

public abstract Builder vpnName(String vpnName);

public abstract BasicAuthJcsmpSessionService build();
}
/** The Queue to receive messages from. */
abstract @Nullable Queue queue();

@Nullable private transient JCSMPSession jcsmpSession;
@Nullable private transient MessageReceiver messageReceiver;
Expand All @@ -90,9 +61,19 @@ public abstract static class Builder {
new ConcurrentLinkedQueue<>();
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();

public static JcsmpSessionService create(JCSMPProperties jcsmpProperties, @Nullable Queue queue) {
return new AutoValue_JcsmpSessionService(jcsmpProperties, queue);
}

@Override
public JCSMPProperties getSessionProperties() {
return jcsmpProperties();
}

@Override
public void connect() {
retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class));
retryCallableManager.retryCallable(
this::connectReadSession, ImmutableSet.of(JCSMPException.class));
}

@Override
Expand Down Expand Up @@ -158,10 +139,7 @@ private MessageProducer createXMLMessageProducer(SubmissionMode submissionMode)
}

private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {

Queue queue =
JCSMPFactory.onlyInstance()
.createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set."));
Queue queue = checkStateNotNull(queue(), "SolaceIO.Read: Queue is not set.");

ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
flowProperties.setEndpoint(queue);
Expand Down Expand Up @@ -190,9 +168,9 @@ private static FlowReceiver createFlowReceiver(
return jcsmpSession.createFlow(null, flowProperties, endpointProperties);
}

private int connectSession() throws JCSMPException {
private int connectReadSession() throws JCSMPException {
if (jcsmpSession == null) {
jcsmpSession = createSessionObject();
jcsmpSession = createReadSessionObject();
}
jcsmpSession.connect();
return 0;
Expand All @@ -206,25 +184,12 @@ private int connectWriteSession(SubmissionMode mode) throws JCSMPException {
return 0;
}

private JCSMPSession createSessionObject() throws InvalidPropertiesException {
JCSMPProperties properties = initializeSessionProperties(new JCSMPProperties());
return JCSMPFactory.onlyInstance().createSession(properties);
private JCSMPSession createReadSessionObject() throws InvalidPropertiesException {
return JCSMPFactory.onlyInstance().createSession(jcsmpProperties());
}

private JCSMPSession createWriteSessionObject(SubmissionMode mode)
throws InvalidPropertiesException {
return JCSMPFactory.onlyInstance().createSession(initializeWriteSessionProperties(mode));
}

@Override
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName());

baseProps.setProperty(
JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC);
baseProps.setProperty(JCSMPProperties.USERNAME, username());
baseProps.setProperty(JCSMPProperties.PASSWORD, password());
baseProps.setProperty(JCSMPProperties.HOST, host());
return baseProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
* messaging system. It allows for establishing a connection, creating a message-receiver object,
* checking if the connection is closed or not, and gracefully closing the session.
*
* <p>Override this class and the method {@link #initializeSessionProperties(JCSMPProperties)} with
* your specific properties, including all those related to authentication.
* <p>Override this class and the method {@link #getSessionProperties()} with your specific
* properties, including all those related to authentication.
*
* <p>The connector will call the method only once per session created, so you can perform
* relatively heavy operations in that method (e.g. connect to a store or vault to retrieve
Expand Down Expand Up @@ -70,8 +70,7 @@
* <p>The connector ensures that no two threads will be calling that method at the same time, so you
* don't have to take any specific precautions to avoid race conditions.
*
* <p>For basic authentication, use {@link BasicAuthJcsmpSessionService} and {@link
* BasicAuthJcsmpSessionServiceFactory}.
* <p>For basic authentication, use {@link BasicAuthJcsmpSessionServiceFactory}.
*
* <p>For other situations, you need to extend this class and implement the `equals` method, so two
* instances of your class can be compared by value. We recommend using AutoValue for that. For
Expand Down Expand Up @@ -143,11 +142,7 @@ public abstract class SessionService implements Serializable {

/**
* Override this method and provide your specific properties, including all those related to
* authentication, and possibly others too. The {@code}baseProperties{@code} parameter sets the
* Solace VPN to "default" if none is specified.
*
* <p>You should add your properties to the parameter {@code}baseProperties{@code}, and return the
* result.
* authentication, and possibly others too.
*
* <p>The method will be used whenever the session needs to be created or refreshed. If you are
* setting credentials with expiration, just make sure that the latest available credentials (e.g.
Expand All @@ -160,7 +155,7 @@ public abstract class SessionService implements Serializable {
* href="https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html">https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPProperties.html</a>
* </ul>
*/
public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties);
public abstract JCSMPProperties getSessionProperties();

/**
* You need to override this method to be able to compare these objects by value. We recommend
Expand All @@ -187,29 +182,30 @@ public abstract class SessionService implements Serializable {
* token), this method will be called again.
*/
public final JCSMPProperties initializeWriteSessionProperties(SolaceIO.SubmissionMode mode) {
JCSMPProperties jcsmpProperties = initializeSessionProperties(getDefaultProperties());
JCSMPProperties jcsmpProperties = getSessionProperties();
return overrideConnectorProperties(jcsmpProperties, mode);
}

private static JCSMPProperties getDefaultProperties() {
JCSMPProperties props = new JCSMPProperties();
props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME);
// Outgoing messages will have a sender timestamp field populated
props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true);
// Make XMLProducer safe to access from several threads. This is the default value, setting
// it just in case.
props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true);

return props;
}

/**
* This method overrides some properties for the broker session to prevent misconfiguration,
* taking into account how the write connector works.
*/
private static JCSMPProperties overrideConnectorProperties(
JCSMPProperties props, SolaceIO.SubmissionMode mode) {

if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
props.setProperty(JCSMPProperties.VPN_NAME, DEFAULT_VPN_NAME);
}
if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
// Outgoing messages will have a sender timestamp field populated
props.setProperty(JCSMPProperties.GENERATE_SEND_TIMESTAMPS, true);
}
if (props.getProperty(JCSMPProperties.VPN_NAME) == null) {
// Make XMLProducer safe to access from several threads. This is the default value, setting
// it just in case.
props.setProperty(JCSMPProperties.PUB_MULTI_THREAD, true);
}

// PUB_ACK_WINDOW_SIZE heavily affects performance when publishing persistent
// messages. It can be a value between 1 and 255. This is the batch size for the ack
// received from Solace. A value of 1 will have the lowest latency, but a very low
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.solacesystems.jcsmp.Queue;
import java.io.Serializable;
import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode;
import org.apache.beam.sdk.io.solace.data.Solace;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -61,7 +62,7 @@ public abstract class SessionServiceFactory implements Serializable {
* This could be used to associate the created SessionService with a specific queue for message
* handling.
*/
@Nullable Queue queue;
private @Nullable Queue queue;

/**
* The write submission mode. This is set when the writers are created. This property is used only
Expand All @@ -75,6 +76,33 @@ public abstract class SessionServiceFactory implements Serializable {
*/
public abstract SessionService create();

/**
* This method is called in the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
* to set the Queue reference based on {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Queue)} or {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Topic)}. The queue can be retrieved in
* the classes that inherit {@link SessionServiceFactory} with the getter method {@link
* SessionServiceFactory#getQueue()}
*/
public final void setQueue(Queue queue) {
this.queue = queue;
}

/**
* Getter for the queue. This is nullable, because at the construction time this reference is
* null. Once the pipeline is compiled and the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
* is called, this reference is valid.
*
* @return a reference to the queue which is set with the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Queue)} or {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#from(Solace.Topic)}
*/
public final @Nullable Queue getQueue() {
return queue;
}

/**
* You need to override this method to be able to compare these objects by value. We recommend
* using AutoValue for that.
Expand All @@ -89,15 +117,6 @@ public abstract class SessionServiceFactory implements Serializable {
@Override
public abstract int hashCode();

/**
* This method is called in the {@link
* org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method
* to set the Queue reference.
*/
public void setQueue(Queue queue) {
this.queue = queue;
}

/**
* Called by the write connector to set the submission mode used to create the message producers.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void connect() {
}

@Override
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) {
public JCSMPProperties getSessionProperties() {
throw new UnsupportedOperationException(exceptionMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ public Queue<PublishResult> getPublishedResultsQueue() {
public void connect() {}

@Override
public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) {
public JCSMPProperties getSessionProperties() {
// Let's override some properties that will be overriden by the connector
// Opposite of the mode, to test that is overriden
JCSMPProperties baseProperties = new JCSMPProperties();
baseProperties.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, callbackOnReactor);

baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, ackWindowSizeForTesting);

return baseProperties;
Expand Down
Loading

0 comments on commit f17bb8d

Please sign in to comment.