Skip to content

Commit

Permalink
feat(rosenet-node): add timeout for direct message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
mkermani144 committed Oct 4, 2024
1 parent 61043f7 commit e998cb1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
5 changes: 5 additions & 0 deletions .changeset/strange-months-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rosen-bridge/rosenet-node': minor
---

Add timeout for handling incoming direct messages
43 changes: 26 additions & 17 deletions packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import {
BulkheadPolicy,
BulkheadRejectedError,
wrap,
timeout,
TimeoutStrategy,
} from 'cockatiel';
import { pipe } from 'it-pipe';

Expand All @@ -19,6 +21,7 @@ import {
MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT,
MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER,
ROSENET_DIRECT_PROTOCOL_V1,
MESSAGE_HANDLING_TIMEOUT,
} from '../constants';

const messageHandlingBulkhead = bulkhead(
Expand Down Expand Up @@ -61,26 +64,32 @@ const handleIncomingMessageFactory =
try {
await wrappedPolicy.execute(async () => {
try {
await pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
const messageHandlingTimeout = timeout(
MESSAGE_HANDLING_TIMEOUT,
TimeoutStrategy.Aggressive,
);
await messageHandlingTimeout.execute(() =>
pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
),
);
} catch (error) {
RoseNetNodeContext.logger.warn(
'An error occurred while reading from stream',
'An error occurred while handling incoming message',
{
error,
},
Expand Down

0 comments on commit e998cb1

Please sign in to comment.