diff --git a/.changeset/afraid-lizards-argue.md b/.changeset/afraid-lizards-argue.md new file mode 100644 index 0000000..b282a1e --- /dev/null +++ b/.changeset/afraid-lizards-argue.md @@ -0,0 +1,5 @@ +--- +'@rosen-bridge/rosenet-node': minor +--- + +Add pubsub limits diff --git a/packages/rosenet-node/lib/constants.ts b/packages/rosenet-node/lib/constants.ts index 75b740a..b352f8e 100644 --- a/packages/rosenet-node/lib/constants.ts +++ b/packages/rosenet-node/lib/constants.ts @@ -4,13 +4,18 @@ export const RELAY_DISCOVERY_RESTART_INTERVAL = 10_000; export const ROSENET_DIRECT_PROTOCOL_V1 = '/rosenet/direct/1'; export const DEFAULT_NODE_PORT = 55123; export const ACK_BYTE = 1; -export const MESSAGE_ROUNDTRIP_TIMEOUT = 1000; +export const MESSAGE_ROUNDTRIP_TIMEOUT = 5000; +export const MESSAGE_HANDLING_TIMEOUT = 2000; export const MESSAGE_RETRY_ATTEMPTS = 5; export const MESSAGE_RETRY_INITIAL_DELAY = 2000; -export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 1000; -export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000; -export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 100; -export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 200; -export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 1000; -export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000; -export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 500; +export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 100; +export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 200; +export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 10; +export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 20; +export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 200; +export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 400; +export const MAX_INBOUND_PUBSUB_THROUGHPUT = 100; +export const MAX_INBOUND_PUBSUB_QUEUE_SIZE = 200; +export const MAX_OUTBOUND_PUBSUB_THROUGHPUT = 200; +export const MAX_OUTBOUND_PUBSUB_QUEUE_SIZE = 400; +export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 2000; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/publish.ts b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts index 0c9af89..34da2e9 100644 --- a/packages/rosenet-node/lib/rosenet-pubsub/publish.ts +++ b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts @@ -1,14 +1,39 @@ import { Libp2p, PubSub } from '@libp2p/interface'; +import { bulkhead, isBulkheadRejectedError } from 'cockatiel'; + +import RoseNetNodeContext from '../context/RoseNetNodeContext'; + +import { + MAX_OUTBOUND_PUBSUB_QUEUE_SIZE, + MAX_OUTBOUND_PUBSUB_THROUGHPUT, +} from '../constants'; const textEncoder = new TextEncoder(); +const bulkheadPolicy = bulkhead( + MAX_OUTBOUND_PUBSUB_THROUGHPUT, + MAX_OUTBOUND_PUBSUB_QUEUE_SIZE, +); + /** * factory for libp2p publish */ const publishFactory = (node: Libp2p<{ pubsub: PubSub }>) => async (topic: string, message: string) => { - node.services.pubsub.publish(topic, textEncoder.encode(message)); + try { + await bulkheadPolicy.execute(() => + node.services.pubsub.publish(topic, textEncoder.encode(message)), + ); + } catch (error) { + if (isBulkheadRejectedError(error)) { + RoseNetNodeContext.logger.warn('Maximum publish threshold reached'); + } else { + RoseNetNodeContext.logger.warn('Message publish failed', { + message, + }); + } + } }; export default publishFactory; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts index 05829fa..e27d4d6 100644 --- a/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts +++ b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts @@ -1,7 +1,20 @@ import { Libp2p, PubSub } from '@libp2p/interface'; +import { bulkhead } from 'cockatiel'; + +import RoseNetNodeContext from '../context/RoseNetNodeContext'; + +import { + MAX_INBOUND_PUBSUB_QUEUE_SIZE, + MAX_INBOUND_PUBSUB_THROUGHPUT, +} from '../constants'; const textDecoder = new TextDecoder(); +const bulkheadPolicy = bulkhead( + MAX_INBOUND_PUBSUB_THROUGHPUT, + MAX_INBOUND_PUBSUB_QUEUE_SIZE, +); + /** * factory for libp2p subscribe */ @@ -9,9 +22,17 @@ const subscribeFactory = (node: Libp2p<{ pubsub: PubSub }>) => async (topic: string, handler: (message: string) => void) => { node.services.pubsub.subscribe(topic); - node.services.pubsub.addEventListener('message', (event) => { - if (event.detail.topic === topic) { - handler(textDecoder.decode(event.detail.data)); + node.services.pubsub.addEventListener('message', async (event) => { + try { + await bulkheadPolicy.execute(() => { + if (event.detail.topic === topic) { + handler(textDecoder.decode(event.detail.data)); + } + }); + } catch { + RoseNetNodeContext.logger.warn( + 'Maximum pubsub message handling threshold reached', + ); } }); };