Skip to content

Commit

Permalink
chore: capping mechanism for relay and service connections (#3184)
Browse files Browse the repository at this point in the history
  • Loading branch information
darshankabariya authored Jan 21, 2025
1 parent 6d167dd commit 2942782
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 96 deletions.
7 changes: 6 additions & 1 deletion apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,12 @@ proc initAndStartApp(
nodeBuilder.withNodeKey(key)
nodeBuilder.withRecord(record)
nodeBUilder.withSwitchConfiguration(maxConnections = some(MaxConnectedPeers))
nodeBuilder.withPeerManagerConfig(maxRelayPeers = some(20), shardAware = true)

nodeBuilder.withPeerManagerConfig(
maxConnections = MaxConnectedPeers,
relayServiceRatio = "13.33:86.67",
shardAware = true,
)
let res = nodeBuilder.withNetworkConfigurationDetails(bindIp, nodeTcpPort)
if res.isErr():
return err("node building error" & $res.error)
Expand Down
57 changes: 30 additions & 27 deletions tests/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -938,8 +938,8 @@ procSuite "Peer Manager":

test "peer manager cant have more max connections than peerstore size":
# Peerstore size can't be smaller than max connections
let peerStoreSize = 5
let maxConnections = 10
let peerStoreSize = 20
let maxConnections = 25

expect(Defect):
let pm = PeerManager.new(
Expand All @@ -962,54 +962,61 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 1,
maxRelayPeers = some(5),
storage = nil,
)

# Create 15 peers and add them to the peerstore
let peers = toSeq(1 .. 15)
# Create 30 peers and add them to the peerstore
let peers = toSeq(1 .. 30)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
.filterIt(it.isOk())
.mapIt(it.value)
for p in peers:
pm.addPeer(p)

# Check that we have 15 peers in the peerstore
# Check that we have 30 peers in the peerstore
check:
pm.wakuPeerStore.peers.len == 15
pm.wakuPeerStore.peers.len == 30

# fake that some peers failed to connected
pm.wakuPeerStore[NumberFailedConnBook][peers[0].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[1].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[2].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[3].peerId] = 2
pm.wakuPeerStore[NumberFailedConnBook][peers[4].peerId] = 2

# fake that some peers are connected
pm.wakuPeerStore[ConnectionBook][peers[5].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[8].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[10].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[12].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[15].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[18].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[24].peerId] = Connected
pm.wakuPeerStore[ConnectionBook][peers[29].peerId] = Connected

# Prune the peerstore (current=15, target=5)
# Prune the peerstore (current=30, target=25)
pm.prunePeerStore()

check:
# ensure peerstore was pruned
pm.wakuPeerStore.peers.len == 10
pm.wakuPeerStore.peers.len == 25

# ensure connected peers were not pruned
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[5].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[8].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[10].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[12].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[15].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[18].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[24].peerId)
pm.wakuPeerStore.peers.anyIt(it.peerId == peers[29].peerId)

# ensure peers that failed were the first to be pruned
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[0].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[1].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[2].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[3].peerId)
not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[4].peerId)

asyncTest "canBeConnected() returns correct value":
let pm = PeerManager.new(
Expand All @@ -1018,14 +1025,13 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
initialBackoffInSec = 1,
# with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
backoffFactor = 2,
maxFailedAttempts = 10,
maxRelayPeers = some(5),
storage = nil,
)
var p1: PeerId
Expand Down Expand Up @@ -1075,10 +1081,9 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxRelayPeers = some(5),
maxFailedAttempts = 150,
storage = nil,
)
Expand All @@ -1091,11 +1096,10 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 10,
maxRelayPeers = some(5),
storage = nil,
)

Expand All @@ -1105,11 +1109,10 @@ procSuite "Peer Manager":
.withRng(rng)
.withMplex()
.withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.withPeerStore(25)
.withMaxConnections(20)
.build(),
maxFailedAttempts = 5,
maxRelayPeers = some(5),
storage = nil,
)

Expand Down
65 changes: 34 additions & 31 deletions tests/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -106,49 +106,52 @@ suite "WakuNode":

await allFutures([node1.stop(), node2.stop()])

asyncTest "Maximum connections can be configured":
asyncTest "Maximum connections can be configured with 20 nodes":
let
maxConnections = 2
maxConnections = 20
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(
nodeKey1,
parseIpAddress("0.0.0.0"),
parseIpAddress("127.0.0.1"),
Port(60010),
maxConnections = maxConnections,
)
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(60012))
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(60013))

check:
# Sanity check, to verify config was applied
node1.switch.connManager.inSema.size == maxConnections

# Node with connection limit set to 1
# Initialize and start node1
await node1.start()
await node1.mountRelay()

# Remote node 1
await node2.start()
await node2.mountRelay()

# Remote node 2
await node3.start()
await node3.mountRelay()

discard
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
await sleepAsync(3.seconds)
discard
await node1.peerManager.connectPeer(node3.switch.peerInfo.toRemotePeerInfo())

# Create an array to hold the other nodes
var otherNodes: seq[WakuNode] = @[]

# Create and start 20 other nodes
for i in 0 ..< maxConnections + 1:
let
nodeKey = generateSecp256k1Key()
port = 60012 + i * 2 # Ensure unique ports for each node
node = newTestWakuNode(nodeKey, parseIpAddress("127.0.0.1"), Port(port))
await node.start()
await node.mountRelay()
otherNodes.add(node)

# Connect all other nodes to node1
for node in otherNodes:
discard
await node1.peerManager.connectPeer(node.switch.peerInfo.toRemotePeerInfo())
await sleepAsync(2.seconds) # Small delay to avoid hammering the connection process

# Check that the number of connections matches the maxConnections
check:
# Verify that only the first connection succeeded
node1.switch.isConnected(node2.switch.peerInfo.peerId)
node1.switch.isConnected(node3.switch.peerInfo.peerId) == false

await allFutures([node1.stop(), node2.stop(), node3.stop()])
node1.switch.isConnected(otherNodes[0].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[8].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[14].switch.peerInfo.peerId)
node1.switch.isConnected(otherNodes[20].switch.peerInfo.peerId) == false

# Stop all nodes
var stopFutures = @[node1.stop()]
for node in otherNodes:
stopFutures.add(node.stop())
await allFutures(stopFutures)

asyncTest "Messages fails with wrong key path":
let nodeKey1 = generateSecp256k1Key()
Expand Down
1 change: 1 addition & 0 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
dnsAddrsNameServers: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
nat: "any",
maxConnections: 50,
relayServiceRatio: "60:40",
maxMessageSize: "1024 KiB",
clusterId: DefaultClusterId,
shards: @[DefaultShardId],
Expand Down
25 changes: 24 additions & 1 deletion waku/common/utils/parse_size_units.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import std/strutils, results, regex
import std/[strutils, math], results, regex

proc parseMsgSize*(input: string): Result[uint64, string] =
## Parses size strings such as "1.2 KiB" or "3Kb" and returns the equivalent number of bytes
Expand Down Expand Up @@ -49,3 +49,26 @@ proc parseCorrectMsgSize*(input: string): uint64 =
let ret = parseMsgSize(input).valueOr:
return 0
return ret

proc parseRelayServiceRatio*(ratio: string): Result[(float, float), string] =
## Parses a relay/service ratio string to [ float, float ]. The total should sum 100%
## e.g., (0.4, 0.6) == parseRelayServiceRatio("40:60")
let elements = ratio.split(":")
if elements.len != 2:
return err("expected format 'X:Y', ratio = " & ratio)

var relayRatio, serviceRatio: float
try:
relayRatio = parseFloat(elements[0])
serviceRatio = parseFloat(elements[1])
except ValueError:
return err("failed to parse ratio numbers: " & ratio)

if relayRatio < 0 or serviceRatio < 0:
return err("relay service ratio must be non-negative, ratio = " & ratio)

let total = relayRatio + serviceRatio
if int(total) != 100:
return err("total ratio should be 100, total = " & $total)

ok((relayRatio / 100.0, serviceRatio / 100.0))
23 changes: 17 additions & 6 deletions waku/factory/builder.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.push raises: [].}

import
std/[options, net],
std/[options, net, math],
results,
chronicles,
libp2p/crypto/crypto,
Expand All @@ -15,7 +15,8 @@ import
../discovery/waku_discv5,
../waku_node,
../node/peer_manager,
../common/rate_limit/setting
../common/rate_limit/setting,
../common/utils/parse_size_units

type
WakuNodeBuilder* = object # General
Expand All @@ -29,7 +30,8 @@ type
peerStorageCapacity: Option[int]

# Peer manager config
maxRelayPeers: Option[int]
maxRelayPeers: int
maxServicePeers: int
colocationLimit: int
shardAware: bool

Expand Down Expand Up @@ -108,9 +110,17 @@ proc withPeerStorage*(
builder.peerStorageCapacity = capacity

proc withPeerManagerConfig*(
builder: var WakuNodeBuilder, maxRelayPeers = none(int), shardAware = false
builder: var WakuNodeBuilder,
maxConnections: int,
relayServiceRatio: string,
shardAware = false,
) =
builder.maxRelayPeers = maxRelayPeers
let (relayRatio, serviceRatio) = parseRelayServiceRatio(relayServiceRatio).get()
var relayPeers = int(ceil(float(maxConnections) * relayRatio))
var servicePeers = int(floor(float(maxConnections) * serviceRatio))

builder.maxServicePeers = servicePeers
builder.maxRelayPeers = relayPeers
builder.shardAware = shardAware

proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) =
Expand Down Expand Up @@ -190,7 +200,8 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
let peerManager = PeerManager.new(
switch = switch,
storage = builder.peerStorage.get(nil),
maxRelayPeers = builder.maxRelayPeers,
maxRelayPeers = some(builder.maxRelayPeers),
maxServicePeers = some(builder.maxServicePeers),
colocationLimit = builder.colocationLimit,
shardedPeerManagement = builder.shardAware,
)
Expand Down
19 changes: 14 additions & 5 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,20 @@ type WakuNodeConf* = object
desc: "Maximum allowed number of libp2p connections.",
defaultValue: 50,
name: "max-connections"
.}: uint16
.}: int

maxRelayPeers* {.
desc:
"Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.",
name: "max-relay-peers"
.}: Option[int]

relayServiceRatio* {.
desc:
"This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)",
name: "relay-service-ratio",
defaultValue: "60:40" # 60:40 ratio of relay to service peers
.}: string

colocationLimit* {.
desc:
Expand All @@ -206,10 +219,6 @@ type WakuNodeConf* = object
name: "ip-colocation-limit"
.}: int

maxRelayPeers* {.
desc: "Maximum allowed number of relay peers.", name: "max-relay-peers"
.}: Option[int]

peerStoreCapacity* {.
desc: "Maximum stored peers in the peerstore.", name: "peer-store-capacity"
.}: Option[int]
Expand Down
Loading

0 comments on commit 2942782

Please sign in to comment.