Skip to content

Commit

Permalink
add findPeer & dht refactor (#793)
Browse files Browse the repository at this point in the history
* add findPeer & dht refactor
  • Loading branch information
alexcos20 authored Jan 13, 2025
1 parent e05ab46 commit c94b0dd
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 49 deletions.
18 changes: 0 additions & 18 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,6 @@ jobs:
- run: docker image ls
- name: Delete default runner images
run: |
docker image rm node:20
docker image rm node:20-alpine
docker image rm node:18
docker image rm node:18-alpine
docker image rm debian:10
docker image rm debian:11
docker image rm ubuntu:22.04
docker image rm ubuntu:20.04
docker image rm moby/buildkit:latest
rm -rf /usr/share/swift/
- name: Wait for contracts deployment and C2D cluster to be ready
working-directory: ${{ github.workspace }}/barge
Expand Down Expand Up @@ -226,15 +217,6 @@ jobs:
- run: docker image ls
- name: Delete default runner images
run: |
docker image rm node:20
docker image rm node:20-alpine
docker image rm node:18
docker image rm node:18-alpine
docker image rm debian:10
docker image rm debian:11
docker image rm ubuntu:22.04
docker image rm ubuntu:20.04
docker image rm moby/buildkit:latest
rm -rf /usr/share/swift/
- name: Wait for contracts deployment and C2D cluster to be ready
Expand Down
32 changes: 32 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,38 @@ returns P2P peer

---

## find peer multiaddress

### `HTTP` GET /findPeer/?

#### Description

returns P2P peer multiaddresses if found in DHT

#### Query Parameters

| name | type | required | description |
| ------- | ------ | -------- | ----------- |
| peerId | string | v | peer id |
| timeout | int | optional | timeout |

#### Response

```
{
"id": "16Uiu2HAmLhRDqfufZiQnxvQs2XHhd6hwkLSPfjAQg1gH8wgRixiP",
"multiaddrs": [
"/ip4/127.0.0.1/tcp/9000",
"/ip4/127.0.0.1/tcp/9001/ws",
"/ip4/172.18.0.2/tcp/9000",
"/ip4/172.18.0.2/tcp/9001/ws",
"/ip6/::1/tcp/9002"
]
}
```

---

## Get P2P Peers

### `HTTP` GET /getP2PPeers
Expand Down
2 changes: 1 addition & 1 deletion docs/env.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/
- `P2P_pubsubPeerDiscoveryInterval`: Interval (in ms) for discovery using pubsub. Defaults to `10000` (three seconds). Example: `10000`
- `P2P_dhtMaxInboundStreams`: Maximum number of DHT inbound streams. Defaults to `500`. Example: `500`
- `P2P_dhtMaxOutboundStreams`: Maximum number of DHT outbound streams. Defaults to `500`. Example: `500`
- `P2P_ENABLE_DHT_SERVER`: Enable DHT server mode. This should be enabled for bootstrapers & well established nodes. Default: `false`
- `P2P_DHT_FILTER`: Filter address in DHT. 0 = (Default) No filter 1. Filter private ddresses. 2. Filter public addresses
- `P2P_mDNSInterval`: Interval (in ms) for discovery using mDNS. Defaults to `20000` (20 seconds). Example: `20000`
- `P2P_connectionsMaxParallelDials`: Maximum number of parallel dials. Defaults to `150`. Example: `150`
- `P2P_connectionsDialTimeout`: Timeout for dial commands. Defaults to `10000` (10 seconds). Example: `10000`
Expand Down
9 changes: 7 additions & 2 deletions src/@types/OceanNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ export interface OceanNodeKeys {
privateKey: any
ethAddress: string
}

/* eslint-disable no-unused-vars */
export enum dhtFilterMethod {
filterPrivate = 'filterPrivate', // default, remove all private addresses from DHT
filterPublic = 'filterPublic', // remove all public addresses from DHT
filterNone = 'filterNone' // do not remove all any addresses from DHT
}
export interface OceanNodeP2PConfig {
bootstrapNodes: string[]
bootstrapTimeout: number
Expand All @@ -41,7 +46,7 @@ export interface OceanNodeP2PConfig {
pubsubPeerDiscoveryInterval: number
dhtMaxInboundStreams: number
dhtMaxOutboundStreams: number
enableDHTServer: boolean
dhtFilter: dhtFilterMethod
mDNSInterval: number
connectionsMaxParallelDials: number
connectionsDialTimeout: number
Expand Down
69 changes: 42 additions & 27 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,22 @@ import { autoNAT } from '@libp2p/autonat'
import { uPnPNAT } from '@libp2p/upnp-nat'
import { ping } from '@libp2p/ping'
import { dcutr } from '@libp2p/dcutr'
import { kadDHT, passthroughMapper } from '@libp2p/kad-dht'
import {
kadDHT,
passthroughMapper,
removePrivateAddressesMapper,
removePublicAddressesMapper
} from '@libp2p/kad-dht'
// import { gossipsub } from '@chainsafe/libp2p-gossipsub'

import { EVENTS, cidFromRawString } from '../../utils/index.js'
import { Transform } from 'stream'
import { Database } from '../database'
import { OceanNodeConfig, FindDDOResponse } from '../../@types/OceanNode'
import {
OceanNodeConfig,
FindDDOResponse,
dhtFilterMethod
} from '../../@types/OceanNode.js'
// eslint-disable-next-line camelcase
import is_ip_private from 'private-ip'
import ip from 'ip'
Expand Down Expand Up @@ -275,8 +284,22 @@ export class OceanP2P extends EventEmitter {
multiaddrs.filter((m) => this.shouldAnnounce(m))
}
}
const dhtOptions = {
allowQueryWithZeroPeers: false,
maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams,
maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams,
clientMode: false, // always be a server
kBucketSize: 20,
protocol: '/ocean/nodes/1.0.0/kad/1.0.0',
peerInfoMapper: passthroughMapper // see below
}
if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPrivate)
dhtOptions.peerInfoMapper = removePrivateAddressesMapper
if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPublic)
dhtOptions.peerInfoMapper = removePublicAddressesMapper
let servicesConfig = {
identify: identify(),
dht: kadDHT(dhtOptions),
identifyPush: identifyPush(),
/*
pubsub: gossipsub({
Expand All @@ -292,27 +315,10 @@ export class OceanP2P extends EventEmitter {
// enabled: true
allowedTopics: ['oceanprotocol._peer-discovery._p2p._pubsub', 'oceanprotocol']
}), */
dht: kadDHT({
// this is necessary because this node is not connected to the public network
// it can be removed if, for example bootstrappers are configured
allowQueryWithZeroPeers: true,
maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams,
maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams,

clientMode: false,
kBucketSize: 20,
protocol: '/ocean/nodes/1.0.0/kad/1.0.0',
peerInfoMapper: passthroughMapper
// protocolPrefix: '/ocean/nodes/1.0.0'
// randomWalk: {
// enabled: true, // Allows to disable discovery (enabled by default)
// interval: 300e3,
// timeout: 10e3
// }
}),
ping: ping(),
dcutr: dcutr()
}

// eslint-disable-next-line no-constant-condition, no-self-compare
if (config.p2pConfig.enableCircuitRelayServer) {
P2P_LOGGER.info('Enabling Circuit Relay Server')
Expand Down Expand Up @@ -420,13 +426,6 @@ export class OceanP2P extends EventEmitter {
this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000)
}

if (config.p2pConfig.enableDHTServer) {
try {
await node.services.dht.setMode('server')
} catch (e) {
P2P_LOGGER.warn(`Failed to set mode server for DHT`)
}
}
return node
} catch (e) {
P2P_LOGGER.logMessageWithEmoji(
Expand Down Expand Up @@ -598,6 +597,22 @@ export class OceanP2P extends EventEmitter {
return finalmultiaddrs
}

async findPeerInDht(peerName: string, timeout?: number) {
try {
const peer = peerIdFromString(peerName)
const data = await this._libp2p.peerRouting.findPeer(peer, {
signal:
isNaN(timeout) || timeout === 0
? AbortSignal.timeout(5000)
: AbortSignal.timeout(timeout),
useCache: true,
useNetwork: true
})
return data
} catch (e) {}
return null
}

async sendTo(
peerName: string,
message: string,
Expand Down
20 changes: 20 additions & 0 deletions src/components/httpRoutes/getOceanPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@ getOceanPeersRoute.get(
}
}
)

getOceanPeersRoute.get(
'/findPeer',
express.urlencoded({ extended: true }),
async (req: Request, res: Response): Promise<void> => {
if (!req.query.peerId) {
res.sendStatus(400)
return
}
if (hasP2PInterface) {
const peers = await req.oceanNode
.getP2PNode()
.findPeerInDht(String(req.query.peerId), parseInt(String(req.query.timeout)))
if (peers) res.json(peers)
else res.sendStatus(404).send('Cannot find peer')
} else {
sendMissingP2PResponse(res)
}
}
)
getOceanPeersRoute.get(
'/getOceanPeers',
async (req: Request, res: Response): Promise<void> => {
Expand Down
15 changes: 14 additions & 1 deletion src/utils/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
OceanNodeKeys,
OceanNodeDockerConfig
} from '../@types/OceanNode'
import { dhtFilterMethod } from '../@types/OceanNode.js'
import type { C2DClusterInfo } from '../@types/C2D.js'
import { C2DClusterType } from '../@types/C2D.js'
import { createFromPrivKey } from '@libp2p/peer-id-factory'
Expand Down Expand Up @@ -548,6 +549,18 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> {
const interfaces = getNodeInterfaces(isStartup)
let bootstrapTtl = getIntEnvValue(process.env.P2P_BOOTSTRAP_TTL, 120000)
if (bootstrapTtl === 0) bootstrapTtl = Infinity
let dhtFilterOption
switch (getIntEnvValue(process.env.P2P_DHT_FILTER, 0)) {
case 1:
dhtFilterOption = dhtFilterMethod.filterPrivate
break
case 2:
dhtFilterOption = dhtFilterMethod.filterPublic
break
default:
dhtFilterOption = dhtFilterMethod.filterNone
}

const config: OceanNodeConfig = {
authorizedDecrypters: getAuthorizedDecrypters(isStartup),
allowedValidators: getAllowedValidators(isStartup),
Expand Down Expand Up @@ -584,7 +597,7 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> {
),
dhtMaxInboundStreams: getIntEnvValue(process.env.P2P_dhtMaxInboundStreams, 500),
dhtMaxOutboundStreams: getIntEnvValue(process.env.P2P_dhtMaxOutboundStreams, 500),
enableDHTServer: getBoolEnvValue('P2P_ENABLE_DHT_SERVER', false),
dhtFilter: dhtFilterOption,
mDNSInterval: getIntEnvValue(process.env.P2P_mDNSInterval, 20e3), // 20 seconds
connectionsMaxParallelDials: getIntEnvValue(
process.env.P2P_connectionsMaxParallelDials,
Expand Down

0 comments on commit c94b0dd

Please sign in to comment.