Skip to content

Commit

Permalink
PROTON-2845 Add more test peer API for Transfer send and expect
Browse files Browse the repository at this point in the history
Adds more APIs to fill in the missing gaps for scripting transfers and
the response dispositions that can be sent back.
  • Loading branch information
tabish121 committed Aug 6, 2024
1 parent e65f05b commit 37efe68
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ public DispositionInjectAction rejected(String condition, String description) {
return DispositionInjectAction.this;
}

public DispositionInjectAction rejected(Symbol condition, String description) {
withState(new Rejected().setError(new ErrorCondition(condition, description)));
return DispositionInjectAction.this;
}

public DispositionInjectAction rejected(String condition, String description, Map<String, Object> info) {
withState(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description, TypeMapper.toSymbolKeyedMap(info))));
return DispositionInjectAction.this;
}

public DispositionInjectAction rejected(Symbol condition, String description, Map<Symbol, Object> info) {
withState(new Rejected().setError(new ErrorCondition(condition, description, info)));
return DispositionInjectAction.this;
}

public DispositionInjectAction modified() {
withState(new Modified());
return DispositionInjectAction.this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.CoreMatchers.nullValue;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand All @@ -42,7 +43,13 @@
import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
import org.apache.qpid.protonj2.test.driver.codec.transport.ReceiverSettleMode;
import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer;
import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.AcceptedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.ModifiedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.RejectedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.ReleasedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transactions.TransactionalStateMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.ErrorConditionMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferMessageMatcher;
import org.hamcrest.Matcher;
Expand Down Expand Up @@ -115,6 +122,14 @@ public DispositionInjectAction reject(Symbol condition, String description) {
return reject(new ErrorCondition(condition, description));
}

public DispositionInjectAction reject(String condition, String description, Map<String, Object> info) {
return reject(new ErrorCondition(Symbol.valueOf(condition), description, TypeMapper.toSymbolKeyedMap(info)));
}

public DispositionInjectAction reject(Symbol condition, String description, Map<Symbol, Object> info) {
return reject(new ErrorCondition(condition, description, info));
}

public DispositionInjectAction reject(ErrorCondition error) {
response = new DispositionInjectAction(driver);
response.withSettled(true);
Expand All @@ -129,9 +144,21 @@ public DispositionInjectAction modify(boolean failed) {
}

public DispositionInjectAction modify(boolean failed, boolean undeliverable) {
return modify(failed, undeliverable, null);
}

public DispositionInjectAction modify(boolean failed, boolean undeliverable, Map<String, Object> annotations) {
final Modified modified = new Modified();

modified.setDeliveryFailed(failed);
modified.setUndeliverableHere(undeliverable);
if (annotations != null) {
modified.setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations));
}

response = new DispositionInjectAction(driver);
response.withSettled(true);
response.withState(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverable));
response.withState(modified);

driver.addScriptedElement(response);
return response;
Expand Down Expand Up @@ -370,37 +397,87 @@ protected Class<Transfer> getExpectedTypeClass() {
public final class DeliveryStateBuilder {

public TransferExpectation accepted() {
withState(Accepted.getInstance());
withState(new AcceptedMatcher());
return TransferExpectation.this;
}

public TransferExpectation released() {
withState(Released.getInstance());
withState(new ReleasedMatcher());
return TransferExpectation.this;
}

public TransferExpectation rejected() {
withState(new Rejected());
withState(new RejectedMatcher());
return TransferExpectation.this;
}

public TransferExpectation rejected(String condition, String description) {
withState(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description)));
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)));
return TransferExpectation.this;
}

public TransferExpectation rejected(Symbol condition, String description) {
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)));
return TransferExpectation.this;
}

public TransferExpectation rejected(String condition, Matcher<?> description) {
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)));
return TransferExpectation.this;
}

public TransferExpectation rejected(Symbol condition, Matcher<?> description) {
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)));
return TransferExpectation.this;
}

public TransferExpectation rejected(String condition, String description, Map<String, Object> info) {
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return TransferExpectation.this;
}

public TransferExpectation rejected(Symbol condition, String description, Map<Symbol, Object> info) {
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfoMap(info)));
return TransferExpectation.this;
}

public TransferExpectation rejected(Symbol condition, String description, Matcher<?> info) {
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return TransferExpectation.this;
}

public TransferExpectation rejected(Symbol condition, Matcher<?> description, Map<Symbol, Object> info) {
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfoMap(info)));
return TransferExpectation.this;
}

public TransferExpectation rejected(Symbol condition, Matcher<?> description, Matcher<?> info) {
withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return TransferExpectation.this;
}

public TransferExpectation modified() {
withState(new Modified());
withState(new ModifiedMatcher());
return TransferExpectation.this;
}

public TransferExpectation modified(boolean failed) {
withState(new Modified());
withState(new ModifiedMatcher().withDeliveryFailed(failed));
return TransferExpectation.this;
}

public TransferExpectation modified(boolean failed, boolean undeliverableHere) {
withState(new Modified());
withState(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere));
return TransferExpectation.this;
}

public TransferExpectation modified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) {
withState(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
return TransferExpectation.this;
}

public TransferExpectation modified(boolean failed, boolean undeliverableHere, Matcher<?> annotations) {
withState(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
return TransferExpectation.this;
}

Expand Down Expand Up @@ -464,37 +541,92 @@ public TransferTransactionalStateMatcher withOutcome(Matcher<?> m) {
// ----- Add a layer to allow configuring the outcome without specific type dependencies

public TransferTransactionalStateMatcher withAccepted() {
super.withOutcome(Accepted.getInstance());
super.withOutcome(new AcceptedMatcher());
return this;
}

public TransferTransactionalStateMatcher withReleased() {
super.withOutcome(Released.getInstance());
super.withOutcome(new ReleasedMatcher());
return this;
}

public TransferTransactionalStateMatcher withRejected() {
super.withOutcome(new Rejected());
super.withOutcome(new RejectedMatcher());
return this;
}

public TransferTransactionalStateMatcher withRejected(String condition, String description) {
super.withOutcome(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description)));
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)));
return this;
}

public TransferTransactionalStateMatcher withRejected(Symbol condition, String description) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)));
return this;
}

public TransferTransactionalStateMatcher withRejected(String condition, Matcher<?> description) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)));
return this;
}

public TransferTransactionalStateMatcher withRejected(Symbol condition, Matcher<?> description) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)));
return this;
}

public TransferTransactionalStateMatcher withRejected(String condition, String description, Map<String, Object> info) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return this;
}

public TransferTransactionalStateMatcher withRejected(Symbol condition, String description, Map<Symbol, Object> info) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfoMap(info)));
return this;
}

public TransferTransactionalStateMatcher withRejected(String condition, String description, Matcher<?> info) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return this;
}

public TransferTransactionalStateMatcher withRejected(Symbol condition, String description, Matcher<?> info) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return this;
}

public TransferTransactionalStateMatcher withRejected(String condition, Matcher<?> description, Matcher<?> info) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return this;
}

public TransferTransactionalStateMatcher withRejected(Symbol condition, Matcher<?> description, Matcher<?> info) {
super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return this;
}

public TransferTransactionalStateMatcher withModified() {
super.withOutcome(new Modified());
super.withOutcome(new ModifiedMatcher());
return this;
}

public TransferTransactionalStateMatcher withModified(boolean failed) {
super.withOutcome(new Modified().setDeliveryFailed(failed));
super.withOutcome(new ModifiedMatcher().withDeliveryFailed(failed));
return this;
}

public TransferTransactionalStateMatcher withModified(boolean failed, boolean undeliverableHere) {
super.withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
super.withOutcome(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere));
return this;
}

public TransferTransactionalStateMatcher withModified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) {
super.withOutcome(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
return this;
}

public TransferTransactionalStateMatcher withModified(boolean failed, boolean undeliverableHere, Matcher<?> annotations) {
super.withOutcome(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,66 @@ public void testTransferInjectAndExpectAPIs() throws Exception {
}
}

@Test
public void testTransferInjectAndExpectAPIsForGenericMatches() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer();
ProtonTestClient client = new ProtonTestClient()) {

peer.expectAMQPHeader().respondWithAMQPHeader();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender().respond().withHandle(42);
peer.remoteFlow().withLinkCredit(1).queue();
// Script a full message using the inject API
peer.expectTransfer().withMessage()
.withProperties().and()
.withDeliveryAnnotations().also()
.withApplicationProperties().and()
.withMessageAnnotations().also()
.withData(new byte[] {0, 1, 2})
.withHeader().and()
.withFooters();
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.start();

URI remoteURI = peer.getServerURI();

LOG.info("Test started, peer listening on: {}", remoteURI);

client.connect(remoteURI.getHost(), remoteURI.getPort());
client.expectAMQPHeader();
client.expectOpen();
client.expectBegin();
client.expectAttach().ofReceiver().withHandle(42);
client.expectFlow().withLinkCredit(1).withHandle(42);
client.remoteTransfer().withHeader().withDurability(true).also()
.withApplicationProperties().withProperty("ap", "pa").also()
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
.withProperties().withCorrelationId("test").also()
.withMessageAnnotations().withAnnotation("ma", "am").also()
.withFooter().withFooter("footer", "value").also()
.withBody().withData(new byte[] {0, 1, 2}).also()
.queue();

// Now start and then await the remote grant of credit and out send of a transfer
client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
client.remoteOpen().now();
client.remoteBegin().now();
client.remoteAttach().ofSender().withHandle(2).now();

client.waitForScriptToComplete(5, TimeUnit.SECONDS);
client.expectDetach().withHandle(42);
client.expectEnd();

client.remoteDetach().now();
client.remoteEnd().now();

client.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}

@Test
public void testTransferInjectAndExpectAPIsFailOnNoMatchInHeader() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer();
Expand Down

0 comments on commit 37efe68

Please sign in to comment.