diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 14f83117f..57a8da879 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,15 +132,6 @@ jobs: - run: docker image ls - name: Delete default runner images run: | - docker image rm node:20 - docker image rm node:20-alpine - docker image rm node:18 - docker image rm node:18-alpine - docker image rm debian:10 - docker image rm debian:11 - docker image rm ubuntu:22.04 - docker image rm ubuntu:20.04 - docker image rm moby/buildkit:latest rm -rf /usr/share/swift/ - name: Wait for contracts deployment and C2D cluster to be ready working-directory: ${{ github.workspace }}/barge @@ -226,15 +217,6 @@ jobs: - run: docker image ls - name: Delete default runner images run: | - docker image rm node:20 - docker image rm node:20-alpine - docker image rm node:18 - docker image rm node:18-alpine - docker image rm debian:10 - docker image rm debian:11 - docker image rm ubuntu:22.04 - docker image rm ubuntu:20.04 - docker image rm moby/buildkit:latest rm -rf /usr/share/swift/ - name: Wait for contracts deployment and C2D cluster to be ready diff --git a/docs/API.md b/docs/API.md index fafcb7ea6..6bade2969 100644 --- a/docs/API.md +++ b/docs/API.md @@ -449,6 +449,38 @@ returns P2P peer --- +## find peer multiaddress + +### `HTTP` GET /findPeer/? + +#### Description + +returns P2P peer multiaddresses if found in DHT + +#### Query Parameters + +| name | type | required | description | +| ------- | ------ | -------- | ----------- | +| peerId | string | v | peer id | +| timeout | int | optional | timeout | + +#### Response + +``` +{ + "id": "16Uiu2HAmLhRDqfufZiQnxvQs2XHhd6hwkLSPfjAQg1gH8wgRixiP", + "multiaddrs": [ + "/ip4/127.0.0.1/tcp/9000", + "/ip4/127.0.0.1/tcp/9001/ws", + "/ip4/172.18.0.2/tcp/9000", + "/ip4/172.18.0.2/tcp/9001/ws", + "/ip6/::1/tcp/9002" + ] +} +``` + +--- + ## Get P2P Peers ### `HTTP` GET /getP2PPeers diff --git a/docs/env.md b/docs/env.md index d23b46512..d6bd1e9da 100644 --- a/docs/env.md +++ b/docs/env.md @@ -55,7 +55,7 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/ - `P2P_pubsubPeerDiscoveryInterval`: Interval (in ms) for discovery using pubsub. Defaults to `10000` (three seconds). Example: `10000` - `P2P_dhtMaxInboundStreams`: Maximum number of DHT inbound streams. Defaults to `500`. Example: `500` - `P2P_dhtMaxOutboundStreams`: Maximum number of DHT outbound streams. Defaults to `500`. Example: `500` -- `P2P_ENABLE_DHT_SERVER`: Enable DHT server mode. This should be enabled for bootstrapers & well established nodes. Default: `false` +- `P2P_DHT_FILTER`: Filter address in DHT. 0 = (Default) No filter 1. Filter private ddresses. 2. Filter public addresses - `P2P_mDNSInterval`: Interval (in ms) for discovery using mDNS. Defaults to `20000` (20 seconds). Example: `20000` - `P2P_connectionsMaxParallelDials`: Maximum number of parallel dials. Defaults to `150`. Example: `150` - `P2P_connectionsDialTimeout`: Timeout for dial commands. Defaults to `10000` (10 seconds). Example: `10000` diff --git a/src/@types/OceanNode.ts b/src/@types/OceanNode.ts index d213939ff..ea58cddbd 100644 --- a/src/@types/OceanNode.ts +++ b/src/@types/OceanNode.ts @@ -23,7 +23,12 @@ export interface OceanNodeKeys { privateKey: any ethAddress: string } - +/* eslint-disable no-unused-vars */ +export enum dhtFilterMethod { + filterPrivate = 'filterPrivate', // default, remove all private addresses from DHT + filterPublic = 'filterPublic', // remove all public addresses from DHT + filterNone = 'filterNone' // do not remove all any addresses from DHT +} export interface OceanNodeP2PConfig { bootstrapNodes: string[] bootstrapTimeout: number @@ -41,7 +46,7 @@ export interface OceanNodeP2PConfig { pubsubPeerDiscoveryInterval: number dhtMaxInboundStreams: number dhtMaxOutboundStreams: number - enableDHTServer: boolean + dhtFilter: dhtFilterMethod mDNSInterval: number connectionsMaxParallelDials: number connectionsDialTimeout: number diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 8d0b35838..0e9d95bf0 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -29,13 +29,22 @@ import { autoNAT } from '@libp2p/autonat' import { uPnPNAT } from '@libp2p/upnp-nat' import { ping } from '@libp2p/ping' import { dcutr } from '@libp2p/dcutr' -import { kadDHT, passthroughMapper } from '@libp2p/kad-dht' +import { + kadDHT, + passthroughMapper, + removePrivateAddressesMapper, + removePublicAddressesMapper +} from '@libp2p/kad-dht' // import { gossipsub } from '@chainsafe/libp2p-gossipsub' import { EVENTS, cidFromRawString } from '../../utils/index.js' import { Transform } from 'stream' import { Database } from '../database' -import { OceanNodeConfig, FindDDOResponse } from '../../@types/OceanNode' +import { + OceanNodeConfig, + FindDDOResponse, + dhtFilterMethod +} from '../../@types/OceanNode.js' // eslint-disable-next-line camelcase import is_ip_private from 'private-ip' import ip from 'ip' @@ -275,8 +284,22 @@ export class OceanP2P extends EventEmitter { multiaddrs.filter((m) => this.shouldAnnounce(m)) } } + const dhtOptions = { + allowQueryWithZeroPeers: false, + maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams, + maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams, + clientMode: false, // always be a server + kBucketSize: 20, + protocol: '/ocean/nodes/1.0.0/kad/1.0.0', + peerInfoMapper: passthroughMapper // see below + } + if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPrivate) + dhtOptions.peerInfoMapper = removePrivateAddressesMapper + if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPublic) + dhtOptions.peerInfoMapper = removePublicAddressesMapper let servicesConfig = { identify: identify(), + dht: kadDHT(dhtOptions), identifyPush: identifyPush(), /* pubsub: gossipsub({ @@ -292,27 +315,10 @@ export class OceanP2P extends EventEmitter { // enabled: true allowedTopics: ['oceanprotocol._peer-discovery._p2p._pubsub', 'oceanprotocol'] }), */ - dht: kadDHT({ - // this is necessary because this node is not connected to the public network - // it can be removed if, for example bootstrappers are configured - allowQueryWithZeroPeers: true, - maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams, - maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams, - - clientMode: false, - kBucketSize: 20, - protocol: '/ocean/nodes/1.0.0/kad/1.0.0', - peerInfoMapper: passthroughMapper - // protocolPrefix: '/ocean/nodes/1.0.0' - // randomWalk: { - // enabled: true, // Allows to disable discovery (enabled by default) - // interval: 300e3, - // timeout: 10e3 - // } - }), ping: ping(), dcutr: dcutr() } + // eslint-disable-next-line no-constant-condition, no-self-compare if (config.p2pConfig.enableCircuitRelayServer) { P2P_LOGGER.info('Enabling Circuit Relay Server') @@ -420,13 +426,6 @@ export class OceanP2P extends EventEmitter { this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000) } - if (config.p2pConfig.enableDHTServer) { - try { - await node.services.dht.setMode('server') - } catch (e) { - P2P_LOGGER.warn(`Failed to set mode server for DHT`) - } - } return node } catch (e) { P2P_LOGGER.logMessageWithEmoji( @@ -598,6 +597,22 @@ export class OceanP2P extends EventEmitter { return finalmultiaddrs } + async findPeerInDht(peerName: string, timeout?: number) { + try { + const peer = peerIdFromString(peerName) + const data = await this._libp2p.peerRouting.findPeer(peer, { + signal: + isNaN(timeout) || timeout === 0 + ? AbortSignal.timeout(5000) + : AbortSignal.timeout(timeout), + useCache: true, + useNetwork: true + }) + return data + } catch (e) {} + return null + } + async sendTo( peerName: string, message: string, diff --git a/src/components/httpRoutes/getOceanPeers.ts b/src/components/httpRoutes/getOceanPeers.ts index 4c619d72b..5ed5caa05 100644 --- a/src/components/httpRoutes/getOceanPeers.ts +++ b/src/components/httpRoutes/getOceanPeers.ts @@ -18,6 +18,26 @@ getOceanPeersRoute.get( } } ) + +getOceanPeersRoute.get( + '/findPeer', + express.urlencoded({ extended: true }), + async (req: Request, res: Response): Promise<void> => { + if (!req.query.peerId) { + res.sendStatus(400) + return + } + if (hasP2PInterface) { + const peers = await req.oceanNode + .getP2PNode() + .findPeerInDht(String(req.query.peerId), parseInt(String(req.query.timeout))) + if (peers) res.json(peers) + else res.sendStatus(404).send('Cannot find peer') + } else { + sendMissingP2PResponse(res) + } + } +) getOceanPeersRoute.get( '/getOceanPeers', async (req: Request, res: Response): Promise<void> => { diff --git a/src/utils/config.ts b/src/utils/config.ts index c7e56b0ca..f73ae5960 100644 --- a/src/utils/config.ts +++ b/src/utils/config.ts @@ -4,6 +4,7 @@ import type { OceanNodeKeys, OceanNodeDockerConfig } from '../@types/OceanNode' +import { dhtFilterMethod } from '../@types/OceanNode.js' import type { C2DClusterInfo } from '../@types/C2D.js' import { C2DClusterType } from '../@types/C2D.js' import { createFromPrivKey } from '@libp2p/peer-id-factory' @@ -548,6 +549,18 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> { const interfaces = getNodeInterfaces(isStartup) let bootstrapTtl = getIntEnvValue(process.env.P2P_BOOTSTRAP_TTL, 120000) if (bootstrapTtl === 0) bootstrapTtl = Infinity + let dhtFilterOption + switch (getIntEnvValue(process.env.P2P_DHT_FILTER, 0)) { + case 1: + dhtFilterOption = dhtFilterMethod.filterPrivate + break + case 2: + dhtFilterOption = dhtFilterMethod.filterPublic + break + default: + dhtFilterOption = dhtFilterMethod.filterNone + } + const config: OceanNodeConfig = { authorizedDecrypters: getAuthorizedDecrypters(isStartup), allowedValidators: getAllowedValidators(isStartup), @@ -584,7 +597,7 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> { ), dhtMaxInboundStreams: getIntEnvValue(process.env.P2P_dhtMaxInboundStreams, 500), dhtMaxOutboundStreams: getIntEnvValue(process.env.P2P_dhtMaxOutboundStreams, 500), - enableDHTServer: getBoolEnvValue('P2P_ENABLE_DHT_SERVER', false), + dhtFilter: dhtFilterOption, mDNSInterval: getIntEnvValue(process.env.P2P_mDNSInterval, 20e3), // 20 seconds connectionsMaxParallelDials: getIntEnvValue( process.env.P2P_connectionsMaxParallelDials,