Skip to content

Commit

Permalink
Merge branch 'fix/pubsub-issues' into 'dev'
Browse files Browse the repository at this point in the history
fix: pubsub issues

Closes #73

See merge request ergo/rosen-bridge/rosenet!32
  • Loading branch information
vorujack committed Sep 11, 2024
2 parents 0282f68 + e971eb0 commit 5e924f6
Show file tree
Hide file tree
Showing 10 changed files with 890 additions and 497 deletions.
6 changes: 6 additions & 0 deletions .changeset/tender-cheetahs-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@rosen-bridge/rosenet-relay': patch
'@rosen-bridge/rosenet-node': patch
---

Fix issues of pubsub causing nodes and relays connection to be abrupted
6 changes: 5 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
node_modules
**/node_modules
# **/dist
**/coverage
**/.terraform
packages-legacy
1,151 changes: 677 additions & 474 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/rosenet-node/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export const RELAYS_COUNT_TO_CONNECT = 3;
export const MIN_RELAYS = 2;
export const RELAY_DISCOVERY_RESTART_INTERVAL = 10_000;
export const ROSENET_DIRECT_PROTOCOL_V1 = '/rosenet/direct/1';
export const DEFAULT_NODE_PORT = 55123;
21 changes: 18 additions & 3 deletions packages/rosenet-node/lib/createRoseNetNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import {

import RoseNetNodeContext from './context/RoseNetNodeContext';

import restartRelayDiscovery from './libp2p/restart-relay-discovery';

import addressService from './address/address-service';
import streamService from './stream/stream-service';

Expand All @@ -37,6 +39,9 @@ import packageJson from '../package.json' with { type: 'json' };

import { RoseNetNodeConfig } from './types';

const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder();

const createRoseNetNode = async ({
logger,
port = DEFAULT_NODE_PORT,
Expand Down Expand Up @@ -102,6 +107,9 @@ const createRoseNetNode = async ({
}),
},
streamMuxers: [yamux()],
connectionManager: {
minConnections: 0,
},
peerDiscovery: [
bootstrap({
list: sampledRelayMultiaddrs,
Expand All @@ -115,8 +123,17 @@ const createRoseNetNode = async ({
services: {
identify: identify(),
pubsub: gossipsub({
allowPublishToZeroPeers: true,
allowPublishToZeroTopicPeers: true,
/**
* Current implementation of Gossipsub includes at most 5000 messages in
* IHAVE or IWANT messages during a `mcachegossip` window, which is by
* default 3 heartbeats. Supposing a limit of 100KB for each message, a
* maximum of around 5000*100KB=500MB is received in 3 heartbeats from
* a single stream, which is 500MB/3≃170MB.
*/
maxInboundDataLength: 170_000_000, // 170MB
}),
restartRelayDiscovery,
},
logger: libp2pLoggerFactory(logger, config.debug?.libp2pComponents ?? []),
});
Expand Down Expand Up @@ -172,14 +189,12 @@ const createRoseNetNode = async ({
);
},
publish: async (topic: string, message: string) => {
const textEncoder = new TextEncoder();
node.services.pubsub.publish(topic, textEncoder.encode(message));
},
subscribe: async (topic: string, handler: (message: string) => void) => {
node.services.pubsub.subscribe(topic);
node.services.pubsub.addEventListener('message', (event) => {
if (event.detail.topic === topic) {
const textDecoder = new TextDecoder();
handler(textDecoder.decode(event.detail.data));
}
});
Expand Down
130 changes: 130 additions & 0 deletions packages/rosenet-node/lib/libp2p/restart-relay-discovery.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { CircuitRelayTransport } from '@libp2p/circuit-relay-v2/src/transport/transport';
import {
Startable,
serviceDependencies,
ComponentLogger,
Logger,
} from '@libp2p/interface';
import { TransportManager } from '@libp2p/interface-internal';
import { BloomFilter } from '@libp2p/utils/filters';

import {
BrokenCircuitError,
circuitBreaker,
ConsecutiveBreaker,
handleWhenResult,
IterableBackoff,
} from 'cockatiel';

import { MIN_RELAYS, RELAY_DISCOVERY_RESTART_INTERVAL } from '../constants';

interface RestartRelayDiscoveryComponents {
transportManager: TransportManager;
logger: ComponentLogger;
}

/**
* Because of the way libp2p tries to maintain the connection to relays, a
* proper libp2p setup for relay discovery should configure a peer router (such
* as kad-dht) so that random walk can be done. But current implementation of
* RoseNet doesn't configure a peer router. In addition, there MAY be an issue
* in the relay discovery implementation which causes the discovery to halt if
* an unexpected error occurs during the discovery. More details can be found in
* the following issue:
* https://github.com/libp2p/js-libp2p/issues/2676
*
* This libp2p service forcefully restarts the whole relay discovery whenever
* the number of relays become smaller than a threshold. This is not a clean
* method to do so at all, and should be removed when the above issue resolves,
* or when a peer router is added to libp2p instance.
*/
class RestartRelayDiscovery implements Startable {
interval: NodeJS.Timeout | null = null;
breaker = circuitBreaker(
handleWhenResult(() => {
const circuitRelayTransport = this.getCircuitRelayTransport();
return (
circuitRelayTransport.reservationStore.reservationCount() < MIN_RELAYS
);
}),
{
breaker: new ConsecutiveBreaker(5),
halfOpenAfter: new IterableBackoff([30_000, 45_000, 60_000]),
},
);
logger: Logger;

constructor(private components: RestartRelayDiscoveryComponents) {
this.logger = components.logger.forComponent(
'libp2p:restart-relay-discovery',
);
}

readonly [serviceDependencies] = ['@libp2p/circuit-relay-v2-transport'];

/**
* @returns circuit relay transport
*/
private getCircuitRelayTransport = () => {
const circuitRelayTransport = this.components.transportManager
.getTransports()
.find(
(transport) =>
transport[Symbol.toStringTag] ===
'@libp2p/circuit-relay-v2-transport',
) as CircuitRelayTransport;

return circuitRelayTransport;
};

/**
* restart relay discovery and clear reservation store relay filter if the
* number of reservations is smaller than a threshold
*/
private restartRelayDiscoveryIfNeeded = () => {
const circuitRelayTransport = this.getCircuitRelayTransport();

if (
circuitRelayTransport.reservationStore.reservationCount() < MIN_RELAYS
) {
circuitRelayTransport['discovery']!.stopDiscovery();
(
circuitRelayTransport.reservationStore['relayFilter'] as BloomFilter
).clear();
circuitRelayTransport['discovery']!.startDiscovery();
this.logger(
'restarted relay discovery in order to re-connect some of the relays',
);
}
};

/**
* start service
*/
start = () => {
this.interval = setInterval(async () => {
try {
await this.breaker.execute(this.restartRelayDiscoveryIfNeeded);
} catch (error) {
if (error instanceof BrokenCircuitError) {
// log error
this.logger('libp2p:restart-relay-discovery circuit is open');
} else {
throw error;
}
}
}, RELAY_DISCOVERY_RESTART_INTERVAL);
};

/**
* stop service
*/
stop = () => {
this.interval && clearInterval(this.interval);
};
}

const restartRelayDiscovery = (components: RestartRelayDiscoveryComponents) =>
new RestartRelayDiscovery(components);

export default restartRelayDiscovery;
2 changes: 1 addition & 1 deletion packages/rosenet-node/lib/utils/codec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const decode = (source: Source<Uint8ArrayList>) =>
source,
lp.decode,
(source) => map(source, (message) => message.subarray()),
(source) => map(source, textDecoder.decode.bind(textDecoder)),
(source) => map(source, input => textDecoder.decode(input)),
);

export { decode, encode };
22 changes: 13 additions & 9 deletions packages/rosenet-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,27 @@
"node": ">=20.11.0"
},
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",
"@chainsafe/libp2p-noise": "^15.0.0",
"@chainsafe/libp2p-gossipsub": "^13.2.0",
"@chainsafe/libp2p-noise": "^15.1.1",
"@chainsafe/libp2p-yamux": "^6.0.2",
"@libp2p/bootstrap": "^10.0.15",
"@libp2p/circuit-relay-v2": "^1.0.15",
"@libp2p/identify": "^1.0.14",
"@libp2p/peer-id": "^4.0.6",
"@libp2p/bootstrap": "^10.1.5",
"@libp2p/circuit-relay-v2": "^1.1.5",
"@libp2p/identify": "^2.1.5",
"@libp2p/interface": "^1.7.0",
"@libp2p/interface-internal": "^1.3.4",
"@libp2p/peer-id": "^4.2.4",
"@libp2p/pubsub-peer-discovery": "^10.0.2",
"@libp2p/tcp": "^9.0.15",
"@multiformats/multiaddr": "^12.2.1",
"@libp2p/tcp": "^9.1.5",
"@libp2p/utils": "^5.4.9",
"@multiformats/multiaddr": "^12.3.0",
"@rosen-bridge/rosenet-utils": "^0.0.0",
"cockatiel": "^3.2.1",
"fast-shuffle": "^6.1.0",
"it-first": "^3.0.4",
"it-length-prefixed": "^9.0.4",
"it-map": "^3.0.5",
"it-pipe": "^3.0.1",
"libp2p": "^1.2.3",
"libp2p": "^1.9.2",
"public-ip": "^6.0.2"
}
}
31 changes: 29 additions & 2 deletions packages/rosenet-relay/lib/createRoseNetRelay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { yamux } from '@chainsafe/libp2p-yamux';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { tcp } from '@libp2p/tcp';
import { createLibp2p } from 'libp2p';
import { ping } from '@libp2p/ping';
import { isPrivate } from '@libp2p/utils/multiaddr/is-private';

import {
addEventListeners,
Expand All @@ -22,6 +24,8 @@ import packageJson from '../package.json' with { type: 'json' };

import { RoseNetRelayConfig } from './types';

const textDecoder = new TextDecoder();

const createRoseNetRelay = async ({
logger,
...config
Expand Down Expand Up @@ -49,6 +53,8 @@ const createRoseNetRelay = async ({
listen: [
`/ip4/${config.listen?.host ?? DEFAULT_LISTEN_HOST}/tcp/${config.listen?.port ?? '0'}`,
],
announceFilter: (addresses) =>
addresses.filter((address) => !isPrivate(address)),
},
transports: [tcp()],
connectionEncryption: [noise()],
Expand All @@ -74,19 +80,41 @@ const createRoseNetRelay = async ({
},
}),
pubsub: gossipsub({
allowPublishToZeroPeers: true,
allowPublishToZeroTopicPeers: true,
D: 0,
Dlo: 0,
Dhi: 0,
Dout: 0,
/**
* Current implementation of Gossipsub includes at most 5000 messages in
* IHAVE or IWANT messages during a `mcachegossip` window, which is by
* default 3 heartbeats. Supposing a limit of 100KB for each message, a
* maximum of around 5000*100KB=500MB is received in 3 heartbeats from
* a single stream, which is 500MB/3≃170MB.
*/
maxInboundDataLength: 170_000_000, // 170MB
}),
identify: identify(),
ping: ping({
/**
* Connection monitor component of libp2p uses `ping` internally. For
* relays, there is a chance they have lots of connections, beyond the
* default number of allowed `ping` steams. It should be increased to
* prevent stream resets as a result of exceeding those limits.
* Supposing that autodial of relays is disabled, there is no need to
* increase outbound streams limit.
*/
maxInboundStreams: 128,
}),
},
peerDiscovery: [pubsubPeerDiscovery({ listenOnly: true })],
nodeInfo: {
name: 'rosenet-relay',
version: packageJson.version,
},
connectionManager: {
minConnections: 0,
},
logger: libp2pLoggerFactory(logger, config.debug?.libp2pComponents ?? []),
});

Expand All @@ -100,7 +128,6 @@ const createRoseNetRelay = async ({
node.services.pubsub.subscribe(topic);
node.services.pubsub.addEventListener('message', (event) => {
if (event.detail.topic === topic) {
const textDecoder = new TextDecoder();
handler(textDecoder.decode(event.detail.data));
}
});
Expand Down
16 changes: 9 additions & 7 deletions packages/rosenet-relay/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@
"node": ">=20.11.0"
},
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^12.0.0",
"@chainsafe/libp2p-noise": "^15.0.0",
"@chainsafe/libp2p-gossipsub": "^13.2.0",
"@chainsafe/libp2p-noise": "^15.1.1",
"@chainsafe/libp2p-yamux": "^6.0.2",
"@libp2p/circuit-relay-v2": "^1.0.15",
"@libp2p/identify": "^1.0.14",
"@libp2p/interface": "^1.1.3",
"@libp2p/circuit-relay-v2": "^1.1.5",
"@libp2p/identify": "^2.1.5",
"@libp2p/interface": "^1.7.0",
"@libp2p/ping": "^1.1.5",
"@libp2p/pubsub-peer-discovery": "^10.0.2",
"@libp2p/tcp": "^9.0.15",
"@libp2p/tcp": "^9.1.5",
"@libp2p/utils": "^5.4.9",
"@rosen-bridge/logger-interface": "^0.2.0",
"@rosen-bridge/rosenet-utils": "^0.0.0",
"libp2p": "^1.2.3"
"libp2p": "^1.9.2"
}
}

0 comments on commit 5e924f6

Please sign in to comment.