Skip to content

Commit

Permalink
feat(rosenet-node): add bulkheads to pubsub APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
mkermani144 committed Oct 4, 2024
1 parent e998cb1 commit 2d87139
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 12 deletions.
5 changes: 5 additions & 0 deletions .changeset/afraid-lizards-argue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rosen-bridge/rosenet-node': minor
---

Add pubsub limits
21 changes: 13 additions & 8 deletions packages/rosenet-node/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ export const RELAY_DISCOVERY_RESTART_INTERVAL = 10_000;
export const ROSENET_DIRECT_PROTOCOL_V1 = '/rosenet/direct/1';
export const DEFAULT_NODE_PORT = 55123;
export const ACK_BYTE = 1;
export const MESSAGE_ROUNDTRIP_TIMEOUT = 1000;
export const MESSAGE_ROUNDTRIP_TIMEOUT = 5000;
export const MESSAGE_HANDLING_TIMEOUT = 2000;
export const MESSAGE_RETRY_ATTEMPTS = 5;
export const MESSAGE_RETRY_INITIAL_DELAY = 2000;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 1000;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 100;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 200;
export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 1000;
export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000;
export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 500;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 100;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 200;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 10;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 20;
export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 200;
export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 400;
export const MAX_INBOUND_PUBSUB_THROUGHPUT = 100;
export const MAX_INBOUND_PUBSUB_QUEUE_SIZE = 200;
export const MAX_OUTBOUND_PUBSUB_THROUGHPUT = 200;
export const MAX_OUTBOUND_PUBSUB_QUEUE_SIZE = 400;
export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 2000;
27 changes: 26 additions & 1 deletion packages/rosenet-node/lib/rosenet-pubsub/publish.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
import { Libp2p, PubSub } from '@libp2p/interface';
import { bulkhead, isBulkheadRejectedError } from 'cockatiel';

import RoseNetNodeContext from '../context/RoseNetNodeContext';

import {
MAX_OUTBOUND_PUBSUB_QUEUE_SIZE,
MAX_OUTBOUND_PUBSUB_THROUGHPUT,
} from '../constants';

const textEncoder = new TextEncoder();

const bulkheadPolicy = bulkhead(
MAX_OUTBOUND_PUBSUB_THROUGHPUT,
MAX_OUTBOUND_PUBSUB_QUEUE_SIZE,
);

/**
* factory for libp2p publish
*/
const publishFactory =
(node: Libp2p<{ pubsub: PubSub }>) =>
async (topic: string, message: string) => {
node.services.pubsub.publish(topic, textEncoder.encode(message));
try {
await bulkheadPolicy.execute(() =>
node.services.pubsub.publish(topic, textEncoder.encode(message)),
);
} catch (error) {
if (isBulkheadRejectedError(error)) {
RoseNetNodeContext.logger.warn('Maximum publish threshold reached');
} else {
RoseNetNodeContext.logger.warn('Message publish failed', {
message,
});
}
}
};

export default publishFactory;
27 changes: 24 additions & 3 deletions packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
import { Libp2p, PubSub } from '@libp2p/interface';
import { bulkhead } from 'cockatiel';

import RoseNetNodeContext from '../context/RoseNetNodeContext';

import {
MAX_INBOUND_PUBSUB_QUEUE_SIZE,
MAX_INBOUND_PUBSUB_THROUGHPUT,
} from '../constants';

const textDecoder = new TextDecoder();

const bulkheadPolicy = bulkhead(
MAX_INBOUND_PUBSUB_THROUGHPUT,
MAX_INBOUND_PUBSUB_QUEUE_SIZE,
);

/**
* factory for libp2p subscribe
*/
const subscribeFactory =
(node: Libp2p<{ pubsub: PubSub }>) =>
async (topic: string, handler: (message: string) => void) => {
node.services.pubsub.subscribe(topic);
node.services.pubsub.addEventListener('message', (event) => {
if (event.detail.topic === topic) {
handler(textDecoder.decode(event.detail.data));
node.services.pubsub.addEventListener('message', async (event) => {
try {
await bulkheadPolicy.execute(() => {
if (event.detail.topic === topic) {
handler(textDecoder.decode(event.detail.data));
}
});
} catch {
RoseNetNodeContext.logger.warn(
'Maximum pubsub message handling threshold reached',
);
}
});
};
Expand Down

0 comments on commit 2d87139

Please sign in to comment.