Skip to content

Commit

Permalink
config & setup
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Dec 17, 2024
1 parent 049fbea commit d9de358
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 4 deletions.
3 changes: 3 additions & 0 deletions tests/waku_store_sync/test_all.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{.used.}

import ./test_protocol, ./test_storage, ./test_codec
8 changes: 8 additions & 0 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,14 @@ proc setupProtocols(
else:
return err("failed to set node waku filter peer: " & filterNode.error)

if conf.storeSync:
(
await node.mountStoreSync(
conf.storeSyncRange, conf.storeSyncInterval, conf.storeSyncRelayJitter
)
).isOkOr:
return err("failed to mount waku store sync protocol: " & $error)

# waku peer exchange setup
if conf.peerExchange:
try:
Expand Down
48 changes: 48 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import
../waku_store/client as store_client,
../waku_store/common as store_common,
../waku_store/resume,
../waku_store_sync,
../waku_filter_v2,
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
Expand Down Expand Up @@ -99,6 +100,8 @@ type
wakuStore*: store.WakuStore
wakuStoreClient*: store_client.WakuStoreClient
wakuStoreResume*: StoreResume
wakuStoreReconciliation*: SyncReconciliation
wakuStoreTransfer*: SyncTransfer
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuRlnRelay*: WakuRLNRelay
Expand Down Expand Up @@ -201,6 +204,35 @@ proc mountSharding*(
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
return ok()

## Waku Sync

proc mountStoreSync*(
node: WakuNode,
storeSyncRange = 3600,
storeSyncInterval = 300,
storeSyncRelayJitter = 20,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[ID](100)
let wantsChannel = newAsyncQueue[(PeerId, Fingerprint)](100)
let needsChannel = newAsyncQueue[(PeerId, Fingerprint)](100)

let recon =
?await SyncReconciliation.new(
node.peerManager, node.wakuArchive, storeSyncRange.seconds,
storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel,
needsChannel,
)

node.wakuStoreReconciliation = recon

let transfer = SyncTransfer.new(
node.peerManager, node.wakuArchive, idsChannel, wantsChannel, needsChannel
)

node.wakuStoreTransfer = transfer

return ok()

## Waku relay

proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
Expand Down Expand Up @@ -231,12 +263,16 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =

await node.wakuArchive.handleMessage(topic, msg)

proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
node.wakuStoreReconciliation.messageIngress(topic, msg)

let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)

discard node.wakuRelay.subscribe(topic, defaultHandler)

Expand Down Expand Up @@ -1301,6 +1337,12 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.start()

if not node.wakuStoreReconciliation.isNil():
node.wakuStoreReconciliation.start()

if not node.wakuStoreTransfer.isNil():
node.wakuStoreTransfer.start()

## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(
Expand Down Expand Up @@ -1340,6 +1382,12 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait()

if not node.wakuStoreReconciliation.isNil():
await node.wakuStoreReconciliation.stopWait()

if not node.wakuStoreTransfer.isNil():
await node.wakuStoreTransfer.stopWait()

if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()

Expand Down
10 changes: 6 additions & 4 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,18 @@ proc syncMessageIngress*(
msgHash: WakuMessageHash,
pubsubTopic: PubsubTopic,
msg: WakuMessage,
) {.async.} =
): Future[Result[void, string]] {.async.} =
let insertStartTime = getTime().toUnixFloat()

(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
trace "failed to insert message",
msg_hash = msgHash.to0xHex(),
msg_hash = msgHash.toHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp,
error = error
return
error = $error
return err(error)

trace "message archived",
msg_hash = msgHash.to0xHex(),
Expand All @@ -144,6 +144,8 @@ proc syncMessageIngress*(
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)

return ok()

proc findMessages*(
self: WakuArchive, query: ArchiveQuery
): Future[ArchiveResult] {.async, gcsafe.} =
Expand Down
9 changes: 9 additions & 0 deletions waku/waku_store_sync.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{.push raises: [].}

import
./waku_store_sync/reconciliation,
./waku_store_sync/transfer,
./waku_store_sync/common,
./waku_store_sync/codec

export reconciliation, transfer, common, codec

0 comments on commit d9de358

Please sign in to comment.