From c32c6eac00f732eff5093c1b3fc2f42360eeaf61 Mon Sep 17 00:00:00 2001 From: alexcos20 <alex.coseru@gmail.com> Date: Fri, 13 Dec 2024 16:06:01 +0200 Subject: [PATCH 1/5] add findPeer & dht refactor --- docs/API.md | 32 ++++++++ docs/env.md | 2 +- src/@types/OceanNode.ts | 9 ++- src/components/P2P/index.ts | 86 +++++++++++----------- src/components/httpRoutes/getOceanPeers.ts | 19 +++++ src/utils/config.ts | 15 +++- 6 files changed, 118 insertions(+), 45 deletions(-) diff --git a/docs/API.md b/docs/API.md index 89a054bf9..f9895ea6c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -449,6 +449,38 @@ returns P2P peer --- +## find peer multiaddress + +### `HTTP` GET /findPeer/? + +#### Description + +returns P2P peer multiaddresses if found in DHT + +#### Query Parameters + +| name | type | required | description | +| ------- | ------ | -------- | ----------- | +| peerId | string | v | peer id | +| timeout | int | optional | timeout | + +#### Response + +``` +{ + "id": "16Uiu2HAmLhRDqfufZiQnxvQs2XHhd6hwkLSPfjAQg1gH8wgRixiP", + "multiaddrs": [ + "/ip4/127.0.0.1/tcp/9000", + "/ip4/127.0.0.1/tcp/9001/ws", + "/ip4/172.18.0.2/tcp/9000", + "/ip4/172.18.0.2/tcp/9001/ws", + "/ip6/::1/tcp/9002" + ] +} +``` + +--- + ## Get P2P Peers ### `HTTP` GET /getP2PPeers diff --git a/docs/env.md b/docs/env.md index 9c718177c..ec7c4b954 100644 --- a/docs/env.md +++ b/docs/env.md @@ -54,7 +54,7 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/ - `P2P_pubsubPeerDiscoveryInterval`: Interval (in ms) for discovery using pubsub. Defaults to `10000` (three seconds). Example: `10000` - `P2P_dhtMaxInboundStreams`: Maximum number of DHT inbound streams. Defaults to `500`. Example: `500` - `P2P_dhtMaxOutboundStreams`: Maximum number of DHT outbound streams. Defaults to `500`. Example: `500` -- `P2P_ENABLE_DHT_SERVER`: Enable DHT server mode. This should be enabled for bootstrapers & well established nodes. Default: `false` +- `P2P_DHT_FILTER`: Filter address in DHT. 0 = (Default) No filter 1. Filter private ddresses. 2. Filter public addresses - `P2P_mDNSInterval`: Interval (in ms) for discovery using mDNS. Defaults to `20000` (20 seconds). Example: `20000` - `P2P_connectionsMaxParallelDials`: Maximum number of parallel dials. Defaults to `150`. Example: `150` - `P2P_connectionsDialTimeout`: Timeout for dial commands. Defaults to `10000` (10 seconds). Example: `10000` diff --git a/src/@types/OceanNode.ts b/src/@types/OceanNode.ts index f221a30d8..0d8cb3ba7 100644 --- a/src/@types/OceanNode.ts +++ b/src/@types/OceanNode.ts @@ -23,7 +23,12 @@ export interface OceanNodeKeys { privateKey: any ethAddress: string } - +/* eslint-disable no-unused-vars */ +export enum dhtFilterMethod { + filterPrivate = 'filterPrivate', // default, remove all private addresses from DHT + filterPublic = 'filterPublic', // remove all public addresses from DHT + filterNone = 'filterNone' // do not remove all any addresses from DHT +} export interface OceanNodeP2PConfig { bootstrapNodes: string[] bootstrapTimeout: number @@ -41,7 +46,7 @@ export interface OceanNodeP2PConfig { pubsubPeerDiscoveryInterval: number dhtMaxInboundStreams: number dhtMaxOutboundStreams: number - enableDHTServer: boolean + dhtFilter: dhtFilterMethod mDNSInterval: number connectionsMaxParallelDials: number connectionsDialTimeout: number diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index f8333115d..b798eb3e6 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -29,13 +29,22 @@ import { autoNAT } from '@libp2p/autonat' import { uPnPNAT } from '@libp2p/upnp-nat' import { ping } from '@libp2p/ping' import { dcutr } from '@libp2p/dcutr' -import { kadDHT, passthroughMapper } from '@libp2p/kad-dht' +import { + kadDHT, + passthroughMapper, + removePrivateAddressesMapper, + removePublicAddressesMapper +} from '@libp2p/kad-dht' // import { gossipsub } from '@chainsafe/libp2p-gossipsub' import { EVENTS, cidFromRawString } from '../../utils/index.js' import { Transform } from 'stream' import { Database } from '../database' -import { OceanNodeConfig, FindDDOResponse } from '../../@types/OceanNode' +import { + OceanNodeConfig, + FindDDOResponse, + dhtFilterMethod +} from '../../@types/OceanNode.js' // eslint-disable-next-line camelcase import is_ip_private from 'private-ip' import ip from 'ip' @@ -283,43 +292,26 @@ export class OceanP2P extends EventEmitter { multiaddrs.filter((m) => this.shouldAnnounce(m)) } } + const dhtOptions = { + allowQueryWithZeroPeers: false, + maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams, + maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams, + clientMode: false, // always be a server + kBucketSize: 20, + protocol: '/ocean/nodes/1.0.0/kad/1.0.0', + peerInfoMapper: passthroughMapper // see below + } + if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPrivate) + dhtOptions.peerInfoMapper = removePrivateAddressesMapper + if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPublic) + dhtOptions.peerInfoMapper = removePublicAddressesMapper let servicesConfig = { identify: identify(), - /* - pubsub: gossipsub({ - fallbackToFloodsub: false, - batchPublish: false, - allowPublishToZeroTopicPeers: true, - asyncValidation: false, - // messageProcessingConcurrency: 5, - seenTTL: 10 * 1000, - runOnTransientConnection: true, - doPX: doPx, - // canRelayMessage: true, - // enabled: true - allowedTopics: ['oceanprotocol._peer-discovery._p2p._pubsub', 'oceanprotocol'] - }), */ - dht: kadDHT({ - // this is necessary because this node is not connected to the public network - // it can be removed if, for example bootstrappers are configured - allowQueryWithZeroPeers: true, - maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams, - maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams, - - clientMode: false, - kBucketSize: 20, - protocol: '/ocean/nodes/1.0.0/kad/1.0.0', - peerInfoMapper: passthroughMapper - // protocolPrefix: '/ocean/nodes/1.0.0' - // randomWalk: { - // enabled: true, // Allows to disable discovery (enabled by default) - // interval: 300e3, - // timeout: 10e3 - // } - }), + dht: kadDHT(dhtOptions), ping: ping(), dcutr: dcutr() } + // eslint-disable-next-line no-constant-condition, no-self-compare if (config.p2pConfig.enableCircuitRelayServer) { P2P_LOGGER.info('Enabling Circuit Relay Server') @@ -427,13 +419,6 @@ export class OceanP2P extends EventEmitter { this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000) } - if (config.p2pConfig.enableDHTServer) { - try { - await node.services.dht.setMode('server') - } catch (e) { - P2P_LOGGER.warn(`Failed to set mode server for DHT`) - } - } return node } catch (e) { P2P_LOGGER.logMessageWithEmoji( @@ -605,6 +590,25 @@ export class OceanP2P extends EventEmitter { return finalmultiaddrs } + async findPeerInDht(peerName: string, timeout?: number) { + const peer = peerIdFromString(peerName) + console.log(peerName) + console.log(peer) + console.log(timeout) + try { + const data = await this._libp2p.peerRouting.findPeer(peer, { + signal: + isNaN(timeout) || timeout === 0 + ? AbortSignal.timeout(5000) + : AbortSignal.timeout(timeout), + useCache: true, + useNetwork: true + }) + return data + } catch (e) {} + return null + } + async sendTo( peerName: string, message: string, diff --git a/src/components/httpRoutes/getOceanPeers.ts b/src/components/httpRoutes/getOceanPeers.ts index 4c619d72b..0cf4e25d0 100644 --- a/src/components/httpRoutes/getOceanPeers.ts +++ b/src/components/httpRoutes/getOceanPeers.ts @@ -18,6 +18,25 @@ getOceanPeersRoute.get( } } ) + +getOceanPeersRoute.get( + '/findPeer', + express.urlencoded({ extended: true }), + async (req: Request, res: Response): Promise<void> => { + if (!req.query.peerId) { + res.sendStatus(400) + return + } + if (hasP2PInterface) { + const peers = await req.oceanNode + .getP2PNode() + .findPeerInDht(String(req.query.peerId), parseInt(String(req.query.timeout))) + res.json(peers) + } else { + sendMissingP2PResponse(res) + } + } +) getOceanPeersRoute.get( '/getOceanPeers', async (req: Request, res: Response): Promise<void> => { diff --git a/src/utils/config.ts b/src/utils/config.ts index 17d4fc020..1a81f39c1 100644 --- a/src/utils/config.ts +++ b/src/utils/config.ts @@ -4,6 +4,7 @@ import type { OceanNodeKeys, OceanNodeDockerConfig } from '../@types/OceanNode' +import { dhtFilterMethod } from '../@types/OceanNode.js' import type { C2DClusterInfo } from '../@types/C2D.js' import { C2DClusterType } from '../@types/C2D.js' import { createFromPrivKey } from '@libp2p/peer-id-factory' @@ -513,6 +514,18 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> { const interfaces = getNodeInterfaces(isStartup) let bootstrapTtl = getIntEnvValue(process.env.P2P_BOOTSTRAP_TTL, 120000) if (bootstrapTtl === 0) bootstrapTtl = Infinity + let dhtFilterOption + switch (getIntEnvValue(process.env.P2P_DHT_FILTER, 0)) { + case 1: + dhtFilterOption = dhtFilterMethod.filterPrivate + break + case 2: + dhtFilterOption = dhtFilterMethod.filterPublic + break + default: + dhtFilterOption = dhtFilterMethod.filterNone + } + const config: OceanNodeConfig = { authorizedDecrypters: getAuthorizedDecrypters(isStartup), allowedValidators: getAllowedValidators(isStartup), @@ -549,7 +562,7 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> { ), dhtMaxInboundStreams: getIntEnvValue(process.env.P2P_dhtMaxInboundStreams, 500), dhtMaxOutboundStreams: getIntEnvValue(process.env.P2P_dhtMaxOutboundStreams, 500), - enableDHTServer: getBoolEnvValue('P2P_ENABLE_DHT_SERVER', false), + dhtFilter: dhtFilterOption, mDNSInterval: getIntEnvValue(process.env.P2P_mDNSInterval, 20e3), // 20 seconds connectionsMaxParallelDials: getIntEnvValue( process.env.P2P_connectionsMaxParallelDials, From dc3f9b790b97cdf86235b26433702c571861d22c Mon Sep 17 00:00:00 2001 From: alexcos20 <alex.coseru@gmail.com> Date: Wed, 18 Dec 2024 08:32:09 +0200 Subject: [PATCH 2/5] remove debugs --- src/components/P2P/index.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index b798eb3e6..b56906f8b 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -592,9 +592,6 @@ export class OceanP2P extends EventEmitter { async findPeerInDht(peerName: string, timeout?: number) { const peer = peerIdFromString(peerName) - console.log(peerName) - console.log(peer) - console.log(timeout) try { const data = await this._libp2p.peerRouting.findPeer(peer, { signal: From d782dd5c09fc10e620a25a69b08aa3fd4904fe6e Mon Sep 17 00:00:00 2001 From: alexcos20 <alex.coseru@gmail.com> Date: Wed, 8 Jan 2025 11:17:31 +0200 Subject: [PATCH 3/5] fix comments --- src/components/P2P/index.ts | 2 +- src/components/httpRoutes/getOceanPeers.ts | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index b56906f8b..68062ac6b 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -591,8 +591,8 @@ export class OceanP2P extends EventEmitter { } async findPeerInDht(peerName: string, timeout?: number) { - const peer = peerIdFromString(peerName) try { + const peer = peerIdFromString(peerName) const data = await this._libp2p.peerRouting.findPeer(peer, { signal: isNaN(timeout) || timeout === 0 diff --git a/src/components/httpRoutes/getOceanPeers.ts b/src/components/httpRoutes/getOceanPeers.ts index 0cf4e25d0..5ed5caa05 100644 --- a/src/components/httpRoutes/getOceanPeers.ts +++ b/src/components/httpRoutes/getOceanPeers.ts @@ -31,7 +31,8 @@ getOceanPeersRoute.get( const peers = await req.oceanNode .getP2PNode() .findPeerInDht(String(req.query.peerId), parseInt(String(req.query.timeout))) - res.json(peers) + if (peers) res.json(peers) + else res.sendStatus(404).send('Cannot find peer') } else { sendMissingP2PResponse(res) } From 0b86b4da43544328ac6dd0fa2ea92cc405569a2c Mon Sep 17 00:00:00 2001 From: Alex Coseru <alex.coseru@gmail.com> Date: Mon, 13 Jan 2025 08:08:22 +0200 Subject: [PATCH 4/5] Update ci.yml --- .github/workflows/ci.yml | 9 --------- 1 file changed, 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 14f83117f..0e3eaae22 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,15 +132,6 @@ jobs: - run: docker image ls - name: Delete default runner images run: | - docker image rm node:20 - docker image rm node:20-alpine - docker image rm node:18 - docker image rm node:18-alpine - docker image rm debian:10 - docker image rm debian:11 - docker image rm ubuntu:22.04 - docker image rm ubuntu:20.04 - docker image rm moby/buildkit:latest rm -rf /usr/share/swift/ - name: Wait for contracts deployment and C2D cluster to be ready working-directory: ${{ github.workspace }}/barge From 4ae11e4edafe7b21638bf719d6ac6e7d18dd795a Mon Sep 17 00:00:00 2001 From: Alex Coseru <alex.coseru@gmail.com> Date: Mon, 13 Jan 2025 08:18:46 +0200 Subject: [PATCH 5/5] Update ci.yml --- .github/workflows/ci.yml | 9 --------- 1 file changed, 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0e3eaae22..57a8da879 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -217,15 +217,6 @@ jobs: - run: docker image ls - name: Delete default runner images run: | - docker image rm node:20 - docker image rm node:20-alpine - docker image rm node:18 - docker image rm node:18-alpine - docker image rm debian:10 - docker image rm debian:11 - docker image rm ubuntu:22.04 - docker image rm ubuntu:20.04 - docker image rm moby/buildkit:latest rm -rf /usr/share/swift/ - name: Wait for contracts deployment and C2D cluster to be ready