diff --git a/tests/waku_store_sync/test_all.nim b/tests/waku_store_sync/test_all.nim new file mode 100644 index 0000000000..82daa388b7 --- /dev/null +++ b/tests/waku_store_sync/test_all.nim @@ -0,0 +1,3 @@ +{.used.} + +import ./test_protocol, ./test_storage, ./test_codec diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 80734d0b86..f03f7eff78 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -358,6 +358,14 @@ proc setupProtocols( else: return err("failed to set node waku filter peer: " & filterNode.error) + if conf.storeSync: + ( + await node.mountStoreSync( + conf.storeSyncRange, conf.storeSyncInterval, conf.storeSyncRelayJitter + ) + ).isOkOr: + return err("failed to mount waku store sync protocol: " & $error) + # waku peer exchange setup if conf.peerExchange: try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 591962472b..8c297431cf 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -34,6 +34,7 @@ import ../waku_store/client as store_client, ../waku_store/common as store_common, ../waku_store/resume, + ../waku_store_sync, ../waku_filter_v2, ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, @@ -99,6 +100,8 @@ type wakuStore*: store.WakuStore wakuStoreClient*: store_client.WakuStoreClient wakuStoreResume*: StoreResume + wakuStoreReconciliation*: SyncReconciliation + wakuStoreTransfer*: SyncTransfer wakuFilter*: waku_filter_v2.WakuFilter wakuFilterClient*: filter_client.WakuFilterClient wakuRlnRelay*: WakuRLNRelay @@ -201,6 +204,35 @@ proc mountSharding*( node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount) return ok() +## Waku Sync + +proc mountStoreSync*( + node: WakuNode, + storeSyncRange = 3600, + storeSyncInterval = 300, + storeSyncRelayJitter = 20, +): Future[Result[void, string]] {.async.} = + let idsChannel = newAsyncQueue[ID](100) + let wantsChannel = newAsyncQueue[(PeerId, Fingerprint)](100) + let needsChannel = newAsyncQueue[(PeerId, Fingerprint)](100) + + let recon = + ?await SyncReconciliation.new( + node.peerManager, node.wakuArchive, storeSyncRange.seconds, + storeSyncInterval.seconds, storeSyncRelayJitter.seconds, idsChannel, wantsChannel, + needsChannel, + ) + + node.wakuStoreReconciliation = recon + + let transfer = SyncTransfer.new( + node.peerManager, node.wakuArchive, idsChannel, wantsChannel, needsChannel + ) + + node.wakuStoreTransfer = transfer + + return ok() + ## Waku relay proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = @@ -231,12 +263,16 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await node.wakuArchive.handleMessage(topic, msg) + proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + node.wakuStoreReconciliation.messageIngress(topic, msg) + let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) + await syncHandler(topic, msg) discard node.wakuRelay.subscribe(topic, defaultHandler) @@ -1301,6 +1337,12 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.start() + if not node.wakuStoreReconciliation.isNil(): + node.wakuStoreReconciliation.start() + + if not node.wakuStoreTransfer.isNil(): + node.wakuStoreTransfer.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( @@ -1340,6 +1382,12 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.stopWait() + if not node.wakuStoreReconciliation.isNil(): + await node.wakuStoreReconciliation.stopWait() + + if not node.wakuStoreTransfer.isNil(): + await node.wakuStoreTransfer.stopWait() + if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil(): await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 914c7366db..c357d71d0d 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -122,18 +122,18 @@ proc syncMessageIngress*( msgHash: WakuMessageHash, pubsubTopic: PubsubTopic, msg: WakuMessage, -) {.async.} = +): Future[Result[void, string]] {.async.} = let insertStartTime = getTime().toUnixFloat() (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) trace "failed to insert message", - msg_hash = msgHash.to0xHex(), + msg_hash = msgHash.toHex(), pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, timestamp = msg.timestamp, - error = error - return + error = $error + return err(error) trace "message archived", msg_hash = msgHash.to0xHex(), @@ -144,6 +144,8 @@ proc syncMessageIngress*( let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) + return ok() + proc findMessages*( self: WakuArchive, query: ArchiveQuery ): Future[ArchiveResult] {.async, gcsafe.} = diff --git a/waku/waku_store_sync.nim b/waku/waku_store_sync.nim new file mode 100644 index 0000000000..6650de9dc9 --- /dev/null +++ b/waku/waku_store_sync.nim @@ -0,0 +1,9 @@ +{.push raises: [].} + +import + ./waku_store_sync/reconciliation, + ./waku_store_sync/transfer, + ./waku_store_sync/common, + ./waku_store_sync/codec + +export reconciliation, transfer, common, codec