diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim index 6faaee2943..aa56ff2e51 100644 --- a/tests/waku_store_sync/sync_utils.nim +++ b/tests/waku_store_sync/sync_utils.nim @@ -1,6 +1,14 @@ import std/[options, random], chronos, chronicles -import waku/[node/peer_manager, waku_core, waku_store_sync/common], ../testlib/wakucore +import + waku/[ + node/peer_manager, + waku_core, + waku_store_sync/common, + waku_store_sync/reconciliation, + waku_store_sync/transfer, + ], + ../testlib/wakucore randomize() @@ -12,7 +20,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash = return hash -#[ proc newTestWakuRecon*( +proc newTestWakuRecon*( switch: Switch, idsRx: AsyncQueue[SyncID], wantsTx: AsyncQueue[(PeerId, Fingerprint)], @@ -25,8 +33,8 @@ proc randomHash*(rng: var Rand): WakuMessageHash = wakuArchive = nil, relayJitter = 0.seconds, idsRx = idsRx, - wantsTx = wantsTx, - needsTx = needsTx, + localWantsTx = wantsTx, + remoteNeedsTx = needsTx, ) let proto = res.get() @@ -34,9 +42,9 @@ proc randomHash*(rng: var Rand): WakuMessageHash = proto.start() switch.mount(proto) - return proto ]# + return proto -#[ proc newTestWakuTransfer*( +proc newTestWakuTransfer*( switch: Switch, idsTx: AsyncQueue[SyncID], wantsRx: AsyncQueue[(PeerId, Fingerprint)], @@ -48,11 +56,11 @@ proc randomHash*(rng: var Rand): WakuMessageHash = peerManager = peerManager, wakuArchive = nil, idsTx = idsTx, - wantsRx = wantsRx, - needsRx = needsRx, + localWantsRx = wantsRx, + remoteNeedsRx = needsRx, ) proto.start() switch.mount(proto) - return proto ]# + return proto diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim new file mode 100644 index 0000000000..7ca783152f --- /dev/null +++ b/tests/waku_store_sync/test_protocol.nim @@ -0,0 +1,377 @@ +{.used.} + +import + std/[options, sets, random, math], + testutils/unittests, + chronos, + libp2p/crypto/crypto, + stew/byteutils + +import + ../../waku/[ + node/peer_manager, + waku_core, + waku_core/message, + waku_core/message/digest, + waku_store_sync/common, + waku_store_sync/storage/range_processing, + waku_store_sync/reconciliation, + waku_store_sync/transfer, + waku_archive/archive, + waku_archive/driver, + waku_archive/common, + ], + ../testlib/[wakucore, testasync], + ../waku_archive/archive_utils, + ./sync_utils + +suite "Waku Sync: reconciliation": + var serverSwitch {.threadvar.}: Switch + var clientSwitch {.threadvar.}: Switch + + var + idsChannel {.threadvar.}: AsyncQueue[SyncID] + localWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + + var server {.threadvar.}: SyncReconciliation + var client {.threadvar.}: SyncReconciliation + + var serverPeerInfo {.threadvar.}: RemotePeerInfo + var clientPeerInfo {.threadvar.}: RemotePeerInfo + + asyncSetup: + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + idsChannel = newAsyncQueue[SyncID]() + localWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + remoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() + + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) + + serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() + + asyncTeardown: + await allFutures(server.stop(), client.stop()) + await allFutures(serverSwitch.stop(), clientSwitch.stop()) + + asyncTest "sync 2 nodes both empty": + check: + idsChannel.len == 0 + localWants.len == 0 + remoteNeeds.len == 0 + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), res.error + + check: + idsChannel.len == 0 + localWants.len == 0 + remoteNeeds.len == 0 + + asyncTest "sync 2 nodes empty client full server": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) + + server.messageIngress(hash1, msg1) + server.messageIngress(hash2, msg2) + server.messageIngress(hash3, msg3) + + check: + remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == false + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false + remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == false + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), res.error + + check: + remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == true + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true + remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == true + + asyncTest "sync 2 nodes full client empty server": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) + + client.messageIngress(hash1, msg1) + client.messageIngress(hash2, msg2) + client.messageIngress(hash3, msg3) + + check: + remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == false + remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == false + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), res.error + + check: + remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == true + remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == true + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true + + asyncTest "sync 2 nodes different hashes": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(DefaultPubsubTopic, msg2) + hash3 = computeMessageHash(DefaultPubsubTopic, msg3) + + server.messageIngress(hash1, msg1) + server.messageIngress(hash2, msg2) + client.messageIngress(hash1, msg1) + client.messageIngress(hash3, msg3) + + check: + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false + localWants.contains((clientPeerInfo.peerId, hash3)) == false + localWants.contains((serverPeerInfo.peerId, hash2)) == false + + var syncRes = await client.storeSynchronization(some(serverPeerInfo)) + assert syncRes.isOk(), $syncRes.error + + check: + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true + localWants.contains((clientPeerInfo.peerId, hash3)) == true + localWants.contains((serverPeerInfo.peerId, hash2)) == true + + asyncTest "sync 2 nodes same hashes": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + + server.messageIngress(hash1, msg1) + client.messageIngress(hash1, msg1) + server.messageIngress(hash2, msg2) + client.messageIngress(hash2, msg2) + + check: + localWants.len == 0 + remoteNeeds.len == 0 + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check: + localWants.len == 0 + remoteNeeds.len == 0 + + asyncTest "sync 2 nodes 100K msgs 1 diff": + let msgCount = 100_000 + var diffIndex = rand(msgCount) + var diff: WakuMessageHash + + # the sync window is 1 hour, spread msg equally in that time + let timeSlice = calculateTimeRange() + let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) + let (part, _) = divmod(timeWindow, 100_000) + + var timestamp = timeSlice.a + + for i in 0 ..< msgCount: + let msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(hash, msg) + + if i != diffIndex: + client.messageIngress(hash, msg) + else: + diff = hash + + timestamp += Timestamp(part) + + check: + localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == false + remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == false + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check: + localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == true + remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true + + asyncTest "sync 2 nodes 10K msgs 1K diffs": + let msgCount = 10_000 + var diffCount = 1_000 + + var diffMsgHashes: HashSet[WakuMessageHash] + var randIndexes: HashSet[int] + + # Diffs + for i in 0 ..< diffCount: + var randInt = rand(0 ..< msgCount) + + #make sure we actually have the right number of diffs + while randInt in randIndexes: + randInt = rand(0 ..< msgCount) + + randIndexes.incl(randInt) + + # sync window is 1 hour, spread msg equally in that time + let timeSlice = calculateTimeRange() + let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) + let (part, _) = divmod(timeWindow, 100_000) + + var timestamp = timeSlice.a + + for i in 0 ..< msgCount: + let + msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic) + hash = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(hash, msg) + + if i in randIndexes: + diffMsgHashes.incl(hash) + else: + client.messageIngress(hash, msg) + + timestamp += Timestamp(part) + continue + + check: + localWants.len == 0 + remoteNeeds.len == 0 + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + # timimg issue make it hard to match exact numbers + check: + localWants.len > 900 + remoteNeeds.len > 900 + +suite "Waku Sync: transfer": + var + serverSwitch {.threadvar.}: Switch + clientSwitch {.threadvar.}: Switch + + var + serverDriver {.threadvar.}: ArchiveDriver + clientDriver {.threadvar.}: ArchiveDriver + serverArchive {.threadvar.}: WakuArchive + clientArchive {.threadvar.}: WakuArchive + + var + serverIds {.threadvar.}: AsyncQueue[SyncID] + serverLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + clientIds {.threadvar.}: AsyncQueue[SyncID] + clientLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + + var + server {.threadvar.}: SyncTransfer + client {.threadvar.}: SyncTransfer + + var + serverPeerInfo {.threadvar.}: RemotePeerInfo + clientPeerInfo {.threadvar.}: RemotePeerInfo + + asyncSetup: + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + serverDriver = newSqliteArchiveDriver() + clientDriver = newSqliteArchiveDriver() + + serverArchive = newWakuArchive(serverDriver) + clientArchive = newWakuArchive(clientDriver) + + let + serverPeerManager = PeerManager.new(serverSwitch) + clientPeerManager = PeerManager.new(clientSwitch) + + serverIds = newAsyncQueue[SyncID]() + serverLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + serverRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() + + server = SyncTransfer.new( + peerManager = serverPeerManager, + wakuArchive = serverArchive, + idsTx = serverIds, + localWantsRx = serverLocalWants, + remoteNeedsRx = serverRemoteNeeds, + ) + + clientIds = newAsyncQueue[SyncID]() + clientLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + clientRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() + + client = SyncTransfer.new( + peerManager = clientPeerManager, + wakuArchive = clientArchive, + idsTx = clientIds, + localWantsRx = clientLocalWants, + remoteNeedsRx = clientRemoteNeeds, + ) + + server.start() + client.start() + + serverSwitch.mount(server) + clientSwitch.mount(client) + + serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() + + serverPeerManager.addPeer(clientPeerInfo) + clientPeermanager.addPeer(serverPeerInfo) + + asyncTeardown: + await allFutures(server.stopWait(), client.stopWait()) + await allFutures(serverSwitch.stop(), clientSwitch.stop()) + + asyncTest "transfer 1 message": + let msg = fakeWakuMessage() + let hash = computeMessageHash(DefaultPubsubTopic, msg) + let msgs = @[msg] + + serverDriver = serverDriver.put(DefaultPubsubTopic, msgs) + + # add server info and msg hash to client want channel + let want = (serverPeerInfo.peerId, hash) + await clientLocalWants.put(want) + + # add client info and msg hash to server need channel + let need = (clientPeerInfo.peerId, hash) + await serverRemoteNeeds.put(need) + + # give time for transfer to happen + await sleepAsync(250.milliseconds) + + var query = ArchiveQuery() + query.includeData = true + query.hashes = @[hash] + + let res = await clientArchive.findMessages(query) + assert res.isOk(), $res.error + + check: + msg == res.get().messages[0] diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 914c7366db..b1f30d6bd3 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -122,7 +122,7 @@ proc syncMessageIngress*( msgHash: WakuMessageHash, pubsubTopic: PubsubTopic, msg: WakuMessage, -) {.async.} = +): Future[Result[void, string]] {.async.} = let insertStartTime = getTime().toUnixFloat() (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: @@ -133,7 +133,7 @@ proc syncMessageIngress*( contentTopic = msg.contentTopic, timestamp = msg.timestamp, error = error - return + return err("failed to insert message") trace "message archived", msg_hash = msgHash.to0xHex(), @@ -144,6 +144,8 @@ proc syncMessageIngress*( let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) + return ok() + proc findMessages*( self: WakuArchive, query: ArchiveQuery ): Future[ArchiveResult] {.async, gcsafe.} = diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index d91cca16cd..ee0b926a3a 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -243,7 +243,7 @@ proc getItemSet( proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] = if buffer.len == 1: - return err("payload too small") + return ok(RangesData()) var payload = RangesData() diff --git a/waku/waku_store_sync/protocols_metrics.nim b/waku/waku_store_sync/protocols_metrics.nim new file mode 100644 index 0000000000..2d2776674e --- /dev/null +++ b/waku/waku_store_sync/protocols_metrics.nim @@ -0,0 +1,17 @@ +import metrics + +const + Reconciliation* = "reconciliation" + Transfer* = "transfer" + Receiving* = "receive" + Sending* = "sent" + +declarePublicHistogram reconciliation_roundtrips, + "the nubmer of roundtrips for each reconciliation", + buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf] + +declarePublicSummary total_bytes_exchanged, + "the number of bytes sent and received by the protocols", ["protocol", "direction"] + +declarePublicCounter total_transfer_messages_exchanged, + "the number of messages sent and received by the transfer protocol", ["direction"] diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim new file mode 100644 index 0000000000..d2ade2ab39 --- /dev/null +++ b/waku/waku_store_sync/reconciliation.nim @@ -0,0 +1,367 @@ +{.push raises: [].} + +import + std/sequtils, + stew/byteutils, + results, + chronicles, + chronos, + metrics, + libp2p/utility, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../common/nimchronos, + ../common/protobuf, + ../common/paging, + ../waku_enr, + ../waku_core/codecs, + ../waku_core/time, + ../waku_core/topics/pubsub_topic, + ../waku_core/message/digest, + ../waku_core/message/message, + ../node/peer_manager/peer_manager, + ../waku_archive, + ./common, + ./codec, + ./storage/storage, + ./storage/seq_storage, + ./storage/range_processing, + ./protocols_metrics + +logScope: + topics = "waku reconciliation" + +const DefaultStorageCap = 50_000 + +type SyncReconciliation* = ref object of LPProtocol + peerManager: PeerManager + + wakuArchive: WakuArchive + + storage: SyncStorage + + # Receive IDs from transfer protocol for storage + idsRx: AsyncQueue[SyncID] + + # Send Hashes to transfer protocol for reception + localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)] + + # Send Hashes to transfer protocol for transmission + remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)] + + # params + syncInterval: timer.Duration # Time between each synchronization attempt + syncRange: timer.Duration # Amount of time in the past to sync + relayJitter: Duration # Amount of time since the present to ignore when syncing + + # futures + periodicSyncFut: Future[void] + periodicPruneFut: Future[void] + idsReceiverFut: Future[void] + +proc messageIngress*( + self: SyncReconciliation, pubsubTopic: PubsubTopic, msg: WakuMessage +) = + let msgHash = computeMessageHash(pubsubTopic, msg) + + let id = SyncID(time: msg.timestamp, hash: msgHash) + + self.storage.insert(id).isOkOr: + error "failed to insert new message", msg_hash = msgHash.toHex(), err = error + +proc messageIngress*( + self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage +) = + let id = SyncID(time: msg.timestamp, hash: msgHash) + + self.storage.insert(id).isOkOr: + error "failed to insert new message", msg_hash = msgHash.toHex(), err = error + +proc messageIngress*(self: SyncReconciliation, id: SyncID) = + self.storage.insert(id).isOkOr: + error "failed to insert new message", msg_hash = id.hash.toHex(), err = error + +proc processRequest( + self: SyncReconciliation, conn: Connection +): Future[Result[void, string]] {.async.} = + var roundTrips = 0 + + while true: + let readRes = catch: + await conn.readLp(int.high) + + let buffer: seq[byte] = readRes.valueOr: + return err("connection read error: " & error.msg) + + total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving]) + + let recvPayload = RangesData.deltaDecode(buffer).valueOr: + return err("payload decoding error: " & error) + + roundTrips.inc() + + trace "sync payload received", + local = self.peerManager.switch.peerInfo.peerId, + remote = conn.peerId, + payload = recvPayload + + if recvPayload.ranges.len == 0 or recvPayload.ranges.allIt(it[1] == RangeType.Skip): + break + + var + hashToRecv: seq[WakuMessageHash] + hashToSend: seq[WakuMessageHash] + + let sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) + + for hash in hashToSend: + await self.remoteNeedsTx.addLast((conn.peerId, hash)) + + for hash in hashToRecv: + await self.localWantstx.addLast((conn.peerId, hash)) + + let rawPayload = sendPayload.deltaEncode() + + total_bytes_exchanged.observe( + rawPayload.len, labelValues = [Reconciliation, Sending] + ) + + let writeRes = catch: + await conn.writeLP(rawPayload) + + if writeRes.isErr(): + return err("connection write error: " & writeRes.error.msg) + + trace "sync payload sent", + local = self.peerManager.switch.peerInfo.peerId, + remote = conn.peerId, + payload = sendPayload + + if sendPayload.ranges.len == 0 or sendPayload.ranges.allIt(it[1] == RangeType.Skip): + break + + continue + + reconciliation_roundtrips.observe(roundTrips) + + await conn.close() + + return ok() + +proc initiate( + self: SyncReconciliation, connection: Connection +): Future[Result[void, string]] {.async.} = + let + timeRange = calculateTimeRange(self.relayJitter, self.syncRange) + lower = SyncID(time: timeRange.a, hash: EmptyFingerprint) + upper = SyncID(time: timeRange.b, hash: FullFingerprint) + bounds = lower .. upper + + fingerprint = self.storage.computeFingerprint(bounds) + initPayload = RangesData( + ranges: @[(bounds, RangeType.Fingerprint)], + fingerprints: @[fingerprint], + itemSets: @[], + ) + + let sendPayload = initPayload.deltaEncode() + + total_bytes_exchanged.observe( + sendPayload.len, labelValues = [Reconciliation, Sending] + ) + + let writeRes = catch: + await connection.writeLP(sendPayload) + + if writeRes.isErr(): + return err("connection write error: " & writeRes.error.msg) + + trace "sync payload sent", + local = self.peerManager.switch.peerInfo.peerId, + remote = connection.peerId, + payload = sendPayload + + ?await self.processRequest(connection) + + return ok() + +proc storeSynchronization*( + self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) +): Future[Result[void, string]] {.async.} = + let peer = peerInfo.valueOr: + self.peerManager.selectPeer(WakuReconciliationCodec).valueOr: + return err("no suitable peer found for sync") + + let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec) + + let conn: Connection = connOpt.valueOr: + return err("cannot establish sync connection") + + debug "sync session initialized", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId + + (await self.initiate(conn)).isOkOr: + error "sync session failed", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error + + return err("sync request error: " & error) + + debug "sync session ended gracefully", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId + + return ok() + +proc initFillStorage( + syncRange: timer.Duration, wakuArchive: WakuArchive +): Future[Result[seq[SyncID], string]] {.async.} = + if wakuArchive.isNil(): + return err("waku archive unavailable") + + let endTime = getNowInNanosecondTime() + let starTime = endTime - syncRange.nanos + + #TODO special query for only timestap and hash ??? + + var query = ArchiveQuery( + includeData: true, + cursor: none(ArchiveCursor), + startTime: some(starTime), + endTime: some(endTime), + pageSize: 100, + direction: PagingDirection.FORWARD, + ) + + debug "initial storage filling started" + + var ids = newSeq[SyncID](DefaultStorageCap) + + # we assume IDs are in order + + while true: + let response = (await wakuArchive.findMessages(query)).valueOr: + return err("archive retrival failed: " & $error) + + for i in 0 ..< response.hashes.len: + let hash = response.hashes[i] + let msg = response.messages[i] + + ids.add(SyncID(time: msg.timestamp, hash: hash)) + + if response.cursor.isNone(): + break + + query.cursor = response.cursor + + debug "initial storage filling done", elements = ids.len + + return ok(ids) + +proc new*( + T: type SyncReconciliation, + peerManager: PeerManager, + wakuArchive: WakuArchive, + syncRange: timer.Duration = DefaultSyncRange, + syncInterval: timer.Duration = DefaultSyncInterval, + relayJitter: timer.Duration = DefaultGossipSubJitter, + idsRx: AsyncQueue[SyncID], + localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)], + remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)], +): Future[Result[T, string]] {.async.} = + let res = await initFillStorage(syncRange, wakuArchive) + let storage = + if res.isErr(): + warn "will not sync messages before this point in time", error = res.error + SeqStorage.new(DefaultStorageCap) + else: + SeqStorage.new(res.get()) + + var sync = SyncReconciliation( + peerManager: peerManager, + storage: storage, + syncRange: syncRange, + syncInterval: syncInterval, + relayJitter: relayJitter, + idsRx: idsRx, + localWantsTx: localWantsTx, + remoteNeedsTx: remoteNeedsTx, + ) + + let handler = proc(conn: Connection, proto: string) {.async, closure.} = + (await sync.processRequest(conn)).isOkOr: + error "request processing error", error = error + + return + + sync.handler = handler + sync.codec = WakuReconciliationCodec + + info "Store Reconciliation protocol initialized" + + return ok(sync) + +proc periodicSync(self: SyncReconciliation) {.async.} = + debug "periodic sync initialized", interval = $self.syncInterval + + while true: # infinite loop + await sleepAsync(self.syncInterval) + + debug "periodic sync started" + + (await self.storeSynchronization()).isOkOr: + error "periodic sync failed", err = error + continue + + debug "periodic sync done" + +proc periodicPrune(self: SyncReconciliation) {.async.} = + debug "periodic prune initialized", interval = $self.syncInterval + + # preventing sync and prune loops of happening at the same time. + await sleepAsync((self.syncInterval div 2)) + + while true: # infinite loop + await sleepAsync(self.syncInterval) + + debug "periodic prune started" + + let time = getNowInNanosecondTime() - self.syncRange.nanos + + let count = self.storage.prune(time) + + debug "periodic prune done", elements_pruned = count + +proc idsReceiverLoop(self: SyncReconciliation) {.async.} = + while true: # infinite loop + let id = await self.idsRx.popfirst() + + self.messageIngress(id) + +proc start*(self: SyncReconciliation) = + if self.started: + return + + self.started = true + + if self.syncInterval > ZeroDuration: + self.periodicSyncFut = self.periodicSync() + + if self.syncInterval > ZeroDuration: + self.periodicPruneFut = self.periodicPrune() + + self.idsReceiverFut = self.idsReceiverLoop() + + info "Store Sync Reconciliation protocol started" + +proc stopWait*(self: SyncReconciliation) {.async.} = + if self.syncInterval > ZeroDuration: + await self.periodicSyncFut.cancelAndWait() + + if self.syncInterval > ZeroDuration: + await self.periodicPruneFut.cancelAndWait() + + await self.idsReceiverFut.cancelAndWait() + + info "Store Sync Reconciliation protocol stopped" diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim new file mode 100644 index 0000000000..e384f04a51 --- /dev/null +++ b/waku/waku_store_sync/transfer.nim @@ -0,0 +1,224 @@ +{.push raises: [].} + +import + std/sets, + results, + chronicles, + chronos, + metrics, + libp2p/utility, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../common/nimchronos, + ../common/protobuf, + ../waku_enr, + ../waku_core/codecs, + ../waku_core/time, + ../waku_core/topics/pubsub_topic, + ../waku_core/message/digest, + ../waku_core/message/message, + ../waku_core/message/default_values, + ../node/peer_manager/peer_manager, + ../waku_archive, + ../waku_archive/common, + ./common, + ./codec, + ./protocols_metrics + +logScope: + topics = "waku transfer" + +type SyncTransfer* = ref object of LPProtocol + wakuArchive: WakuArchive + peerManager: PeerManager + + # Send IDs to reconciliation protocol for storage + idsTx: AsyncQueue[SyncID] + + # Receive Hashes from reconciliation protocol for reception + localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)] + localWantsRxFut: Future[void] + inSessions: Table[PeerId, HashSet[WakuMessageHash]] + + # Receive Hashes from reconciliation protocol for transmission + remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)] + remoteNeedsRxFut: Future[void] + outSessions: Table[PeerId, Connection] + +proc sendMessage( + conn: Connection, payload: WakuMessageAndTopic +): Future[Result[void, string]] {.async.} = + let rawPayload = payload.encode().buffer + + total_bytes_exchanged.observe(rawPayload.len, labelValues = [Transfer, Sending]) + + let writeRes = catch: + await conn.writeLP(rawPayload) + + if writeRes.isErr(): + return err("connection write error: " & writeRes.error.msg) + + total_transfer_messages_exchanged.inc(labelValues = [Sending]) + + return ok() + +proc openConnection( + self: SyncTransfer, peerId: PeerId +): Future[Result[Connection, string]] {.async.} = + let connOpt = await self.peerManager.dialPeer(peerId, WakuTransferCodec) + + let conn: Connection = connOpt.valueOr: + return err("Cannot establish transfer connection") + + debug "transfer session initialized", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId + + return ok(conn) + +proc wantsReceiverLoop(self: SyncTransfer) {.async.} = + ## Waits for message hashes, + ## store the peers and hashes locally as + ## "supposed to be received" + + while true: # infinite loop + let (peerId, fingerprint) = await self.localWantsRx.popFirst() + + self.inSessions.withValue(peerId, value): + value[].incl(fingerprint) + do: + var hashes = initHashSet[WakuMessageHash]() + hashes.incl(fingerprint) + self.inSessions[peerId] = hashes + + return + +proc needsReceiverLoop(self: SyncTransfer) {.async.} = + ## Waits for message hashes, + ## open connection to the other peers, + ## get the messages from DB and then send them. + + while true: # infinite loop + let (peerId, fingerprint) = await self.remoteNeedsRx.popFirst() + + if not self.outSessions.hasKey(peerId): + let connection = (await self.openConnection(peerId)).valueOr: + error "failed to establish transfer connection", error = error + continue + + self.outSessions[peerid] = connection + + let connection = self.outSessions[peerId] + + var query = ArchiveQuery() + query.includeData = true + query.hashes = @[fingerprint] + + let response = (await self.wakuArchive.findMessages(query)).valueOr: + error "failed to query archive", error = error + continue + + let msg = + WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0]) + + (await sendMessage(connection, msg)).isOkOr: + error "failed to send message", error = error + continue + + return + +proc initProtocolHandler(self: SyncTransfer) = + let handler = proc(conn: Connection, proto: string) {.async, closure.} = + while true: + let readRes = catch: + await conn.readLp(int64(DefaultMaxWakuMessageSize)) + + let buffer: seq[byte] = readRes.valueOr: + # connection closed normally + break + + total_bytes_exchanged.observe(buffer.len, labelValues = [Transfer, Receiving]) + + let payload = WakuMessageAndTopic.decode(buffer).valueOr: + error "decoding error", error = $error + continue + + total_transfer_messages_exchanged.inc(labelValues = [Receiving]) + + let msg = payload.message + let pubsub = payload.pubsub + + let hash = computeMessageHash(pubsub, msg) + + self.inSessions.withValue(conn.peerId, value): + if value[].missingOrExcl(hash): + error "unwanted hash received, disconnecting" + self.inSessions.del(conn.peerId) + await conn.close() + break + do: + error "unwanted hash received, disconnecting" + self.inSessions.del(conn.peerId) + await conn.close() + break + + #TODO verify msg RLN proof... + + (await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr: + continue + + let id = SyncID(time: msg.timestamp, hash: hash) + await self.idsTx.addLast(id) + + continue + + debug "transfer session ended", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId + + return + + self.handler = handler + self.codec = WakuTransferCodec + +proc new*( + T: type SyncTransfer, + peerManager: PeerManager, + wakuArchive: WakuArchive, + idsTx: AsyncQueue[SyncID], + localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)], + remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)], +): T = + var transfer = SyncTransfer( + peerManager: peerManager, + wakuArchive: wakuArchive, + idsTx: idsTx, + localWantsRx: localWantsRx, + remoteNeedsRx: remoteNeedsRx, + ) + + transfer.initProtocolHandler() + + info "Store Transfer protocol initialized" + + return transfer + +proc start*(self: SyncTransfer) = + if self.started: + return + + self.started = true + + self.localWantsRxFut = self.wantsReceiverLoop() + self.remoteNeedsRxFut = self.needsReceiverLoop() + + info "Store Sync Transfer protocol started" + +proc stopWait*(self: SyncTransfer) {.async.} = + self.started = false + + await self.localWantsRxFut.cancelAndWait() + await self.remoteNeedsRxFut.cancelAndWait() + + info "Store Sync Transfer protocol stopped"