Skip to content

Commit

Permalink
Merge branch 'feat/throughput-improvement' into 'dev'
Browse files Browse the repository at this point in the history
feat: add fail fast feature for sending messages

Closes #90

See merge request ergo/rosen-bridge/rosenet!47
  • Loading branch information
vorujack committed Oct 12, 2024
2 parents bb73d3e + c4c4ce3 commit c7e109f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .changeset/red-mangos-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rosen-bridge/rosenet-node': minor
---

Enable fail fast when message sending bulkhead execution slots becomes nearly full
15 changes: 15 additions & 0 deletions packages/rosenet-node/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,20 @@ export const MESSAGE_HANDLING_TIMEOUT = 2000;
* The number of times we attempt to re-send failed messages
*/
export const MESSAGE_RETRY_ATTEMPTS = 5;
/**
* The number of times we attempt to re-send failed messages when in fail fast
* mode
*/
export const FAIL_FAST_MESSAGE_RETRY_ATTEMPTS = 1;
/**
* Initial delay after which we retry sending a failed message
*/
export const MESSAGE_RETRY_INITIAL_DELAY = 2000;
/**
* Initial delay after which we retry sending a failed message when in fail fast
* mode
*/
export const FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY = 5000;
/**
* Maximum number of incoming RoseNet Direct messages that can be handled
* concurrently
Expand Down Expand Up @@ -95,3 +105,8 @@ export const MAX_OUTBOUND_PUBSUB_QUEUE_SIZE = 400;
* upgrade) can take
*/
export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 2000;
/**
* Threshold for enabling fail fast when sending messages, resulting in fewer
* retry attempts to prevent the queues from becoming full
*/
export const FAIL_FAST_THRESHOLD = MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT / 4;
27 changes: 22 additions & 5 deletions packages/rosenet-node/lib/rosenet-direct/sendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import RoseNetNodeError from '../errors/RoseNetNodeError';

import {
ACK_BYTE,
MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT,
FAIL_FAST_MESSAGE_RETRY_ATTEMPTS,
FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY,
FAIL_FAST_THRESHOLD,
MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE,
MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT,
MESSAGE_RETRY_ATTEMPTS,
MESSAGE_RETRY_INITIAL_DELAY,
MESSAGE_ROUNDTRIP_TIMEOUT,
Expand Down Expand Up @@ -141,10 +144,20 @@ const sendMessageWithRetryAndBulkheadFactory =
onSettled?: (error?: Error) => Promise<void>,
) => {
const sendMessageInner = sendMessageFactory(node);

const shouldFailFast = bulkheadPolicy.executionSlots > FAIL_FAST_THRESHOLD;

const maxAttempts = shouldFailFast
? MESSAGE_RETRY_ATTEMPTS
: FAIL_FAST_MESSAGE_RETRY_ATTEMPTS;
const initialDelay = shouldFailFast
? MESSAGE_RETRY_INITIAL_DELAY
: FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY;

const retryPolicy = retry(handleAll, {
maxAttempts: MESSAGE_RETRY_ATTEMPTS,
maxAttempts,
backoff: new ExponentialBackoff({
initialDelay: MESSAGE_RETRY_INITIAL_DELAY,
initialDelay,
}),
});
retryPolicy.onFailure((data) => {
Expand All @@ -167,9 +180,13 @@ const sendMessageWithRetryAndBulkheadFactory =
wrappedPolicy
.execute(() => sendMessageInner(to, message))
.then(() => onSettled?.())
.catch(() => {
.catch((error) => {
RoseNetNodeContext.logger.warn(
'Message sending failed regardless of 3 retries, dropping message',
'Message sending failed, dropping message',
{
lastOccurredError: error,
isFailFastEnabled: shouldFailFast,
},
);
RoseNetNodeContext.logger.debug('Message was: ', {
message,
Expand Down

0 comments on commit c7e109f

Please sign in to comment.